Repository: spark
Updated Branches:
  refs/heads/master a9c4e2995 -> 3a180c19a


[SPARK-6629] cancelJobGroup() may not work for jobs whose job groups are 
inherited from parent threads

When a job is submitted with a job group and that job group is inherited from a 
parent thread, there are multiple bugs that may prevent this job from being 
cancelable via `SparkContext.cancelJobGroup()`:

- When filtering jobs based on their job group properties, DAGScheduler calls 
`get()` instead of `getProperty()`, which does not respect inheritance, so it 
will skip over jobs whose job group properties were inherited.
- `Properties` objects are mutable, but we do not make defensive copies / 
snapshots, so modifications of the parent thread's job group will cause running 
jobs' groups to change; this also breaks cancelation.

Both of these issues are easy to fix: use `getProperty()` and perform defensive 
copying.

Author: Josh Rosen <[email protected]>

Closes #5288 from JoshRosen/localProperties-mutability-race and squashes the 
following commits:

9e29654 [Josh Rosen] Fix style issue
5d90750 [Josh Rosen] Merge remote-tracking branch 'origin/master' into 
localProperties-mutability-race
3f7b9e8 [Josh Rosen] Add JIRA reference; move clone into DAGScheduler
707e417 [Josh Rosen] Clone local properties to prevent mutations from breaking 
job cancellation.
b376114 [Josh Rosen] Fix bug that prevented jobs with inherited job group 
properties from being cancelled.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a180c19
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a180c19
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a180c19

Branch: refs/heads/master
Commit: 3a180c19a4ef165366186e23d8e8844c5baaecdd
Parents: a9c4e29
Author: Josh Rosen <[email protected]>
Authored: Wed Apr 29 13:31:52 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Wed Apr 29 13:31:52 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 15 +++++--
 .../org/apache/spark/JobCancellationSuite.scala | 35 +++++++++++++++
 .../scala/org/apache/spark/ThreadingSuite.scala | 46 +++++++++++++++++++-
 3 files changed, 91 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a180c19/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b511c30..05b8ab0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -28,6 +28,8 @@ import scala.language.existentials
 import scala.language.postfixOps
 import scala.util.control.NonFatal
 
+import org.apache.commons.lang3.SerializationUtils
+
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.executor.TaskMetrics
@@ -510,7 +512,8 @@ class DAGScheduler(
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
     eventProcessLoop.post(JobSubmitted(
-      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, 
properties))
+      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,
+      SerializationUtils.clone(properties)))
     waiter
   }
 
@@ -547,7 +550,8 @@ class DAGScheduler(
     val partitions = (0 until rdd.partitions.size).toArray
     val jobId = nextJobId.getAndIncrement()
     eventProcessLoop.post(JobSubmitted(
-      jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, 
properties))
+      jobId, rdd, func2, partitions, allowLocal = false, callSite, listener,
+      SerializationUtils.clone(properties)))
     listener.awaitResult()    // Will throw an exception if the job fails
   }
 
@@ -704,8 +708,11 @@ class DAGScheduler(
   private[scheduler] def handleJobGroupCancelled(groupId: String) {
     // Cancel all jobs belonging to this job group.
     // First finds all active jobs with this group id, and then kill stages 
for them.
-    val activeInGroup = activeJobs.filter(activeJob =>
-      
Option(activeJob.properties).exists(_.get(SparkContext.SPARK_JOB_GROUP_ID) == 
groupId))
+    val activeInGroup = activeJobs.filter { activeJob =>
+      Option(activeJob.properties).exists {
+        _.getProperty(SparkContext.SPARK_JOB_GROUP_ID) == groupId
+      }
+    }
     val jobIds = activeInGroup.map(_.jobId)
     jobIds.foreach(handleJobCancellation(_, "part of cancelled job group 
%s".format(groupId)))
     submitWaitingStages()

http://git-wip-us.apache.org/repos/asf/spark/blob/3a180c19/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 4d3e097..ae17fc6 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -141,6 +141,41 @@ class JobCancellationSuite extends FunSuite with Matchers 
with BeforeAndAfter
     assert(jobB.get() === 100)
   }
 
+  test("inherited job group (SPARK-6629)") {
+    sc = new SparkContext("local[2]", "test")
+
+    // Add a listener to release the semaphore once any tasks are launched.
+    val sem = new Semaphore(0)
+    sc.addSparkListener(new SparkListener {
+      override def onTaskStart(taskStart: SparkListenerTaskStart) {
+        sem.release()
+      }
+    })
+
+    sc.setJobGroup("jobA", "this is a job to be cancelled")
+    @volatile var exception: Exception = null
+    val jobA = new Thread() {
+      // The job group should be inherited by this thread
+      override def run(): Unit = {
+        exception = intercept[SparkException] {
+          sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i 
}.count()
+        }
+      }
+    }
+    jobA.start()
+
+    // Block until both tasks of job A have started and cancel job A.
+    sem.acquire(2)
+    sc.cancelJobGroup("jobA")
+    jobA.join(10000)
+    assert(!jobA.isAlive)
+    assert(exception.getMessage contains "cancel")
+
+    // Once A is cancelled, job B should finish fairly quickly.
+    val jobB = sc.parallelize(1 to 100, 2).countAsync()
+    assert(jobB.get() === 100)
+  }
+
   test("job group with interruption") {
     sc = new SparkContext("local[2]", "test")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3a180c19/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala 
b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index b5383d5..10917c8 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark
 
-import java.util.concurrent.Semaphore
+import java.util.concurrent.{TimeUnit, Semaphore}
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.atomic.AtomicInteger
 
+import org.apache.spark.scheduler._
 import org.scalatest.FunSuite
 
 /**
@@ -189,4 +190,47 @@ class ThreadingSuite extends FunSuite with 
LocalSparkContext {
     assert(sc.getLocalProperty("test") === "parent")
     assert(sc.getLocalProperty("Foo") === null)
   }
+
+  test("mutations to local properties should not affect submitted jobs 
(SPARK-6629)") {
+    val jobStarted = new Semaphore(0)
+    val jobEnded = new Semaphore(0)
+    @volatile var jobResult: JobResult = null
+
+    sc = new SparkContext("local", "test")
+    sc.setJobGroup("originalJobGroupId", "description")
+    sc.addSparkListener(new SparkListener {
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+        jobStarted.release()
+      }
+      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+        jobResult = jobEnd.jobResult
+        jobEnded.release()
+      }
+    })
+
+    // Create a new thread which will inherit the current thread's properties
+    val thread = new Thread() {
+      override def run(): Unit = {
+        assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === 
"originalJobGroupId")
+        // Sleeps for a total of 10 seconds, but allows cancellation to 
interrupt the task
+        try {
+          sc.parallelize(1 to 100).foreach { x =>
+            Thread.sleep(100)
+          }
+        } catch {
+          case s: SparkException => // ignored so that we don't print noise in 
test logs
+        }
+      }
+    }
+    thread.start()
+    // Wait for the job to start, then mutate the original properties, which 
should have been
+    // inherited by the running job but hopefully defensively copied or 
snapshotted:
+    jobStarted.tryAcquire(10, TimeUnit.SECONDS)
+    sc.setJobGroup("modifiedJobGroupId", "description")
+    // Canceling the original job group should cancel the running job. In 
other words, the
+    // modification of the properties object should not affect the properties 
of running jobs
+    sc.cancelJobGroup("originalJobGroupId")
+    jobEnded.tryAcquire(10, TimeUnit.SECONDS)
+    assert(jobResult.isInstanceOf[JobFailed])
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to