Repository: spark
Updated Branches:
  refs/heads/branch-1.0 30cfa8dc1 -> ee633210c


[SPARK-1745] Move interrupted flag from TaskContext constructor (minor)

It makes little sense to start a TaskContext that is interrupted. Indeed, I 
searched for all use cases of it and didn't find a single instance in which 
`interrupted` is true on construction.

This was inspired by reviewing #640, which adds an additional `@volatile var 
completed` that is similar. These are not the most urgent changes, but I wanted 
to push them out before I forget.

Author: Andrew Or <[email protected]>

Closes #675 from andrewor14/task-context and squashes the following commits:

9575e02 [Andrew Or] Add space
69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
task-context
c471490 [Andrew Or] Oops, removed one flag too many. Adding it back.
85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor

(cherry picked from commit c3f8b78c211df6c5adae74f37e39fb55baeff723)
Signed-off-by: Aaron Davidson <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee633210
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee633210
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee633210

Branch: refs/heads/branch-1.0
Commit: ee633210cc92c62865333ab24356716c1ab57944
Parents: 30cfa8d
Author: Andrew Or <[email protected]>
Authored: Thu May 8 12:13:07 2014 -0700
Committer: Aaron Davidson <[email protected]>
Committed: Thu May 8 12:13:59 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskContext.scala    | 20 +++++++++++---------
 .../apache/spark/scheduler/ShuffleMapTask.scala |  3 +--
 .../java/org/apache/spark/JavaAPISuite.java     |  2 +-
 .../org/apache/spark/CacheManagerSuite.scala    | 10 +++-------
 .../scala/org/apache/spark/PipedRDDSuite.scala  |  4 +---
 5 files changed, 17 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee633210/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index fc48127..51f40c3 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -28,13 +28,12 @@ import org.apache.spark.executor.TaskMetrics
  */
 @DeveloperApi
 class TaskContext(
-  val stageId: Int,
-  val partitionId: Int,
-  val attemptId: Long,
-  val runningLocally: Boolean = false,
-  @volatile var interrupted: Boolean = false,
-  private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty
-) extends Serializable {
+    val stageId: Int,
+    val partitionId: Int,
+    val attemptId: Long,
+    val runningLocally: Boolean = false,
+    private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty)
+  extends Serializable {
 
   @deprecated("use partitionId", "0.8.1")
   def splitId = partitionId
@@ -42,7 +41,10 @@ class TaskContext(
   // List of callback functions to execute when the task completes.
   @transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit]
 
-  // Set to true when the task is completed, before the onCompleteCallbacks 
are executed.
+  // Whether the corresponding task has been killed.
+  @volatile var interrupted: Boolean = false
+
+  // Whether the task has completed, before the onCompleteCallbacks are 
executed.
   @volatile var completed: Boolean = false
 
   /**
@@ -58,6 +60,6 @@ class TaskContext(
   def executeOnCompleteCallbacks() {
     completed = true
     // Process complete callbacks in the reverse order of registration
-    onCompleteCallbacks.reverse.foreach{_()}
+    onCompleteCallbacks.reverse.foreach { _() }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ee633210/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 2259df0..4b0324f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -23,7 +23,6 @@ import java.io._
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
 import scala.collection.mutable.HashMap
-import scala.util.Try
 
 import org.apache.spark._
 import org.apache.spark.executor.ShuffleWriteMetrics
@@ -70,7 +69,7 @@ private[spark] object ShuffleMapTask {
   }
 
   // Since both the JarSet and FileSet have the same format this is used for 
both.
-  def deserializeFileSet(bytes: Array[Byte]) : HashMap[String, Long] = {
+  def deserializeFileSet(bytes: Array[Byte]): HashMap[String, Long] = {
     val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
     val objIn = new ObjectInputStream(in)
     val set = objIn.readObject().asInstanceOf[Array[(String, Long)]].toMap

http://git-wip-us.apache.org/repos/asf/spark/blob/ee633210/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java 
b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index c3e03ce..1912015 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -597,7 +597,7 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void iterator() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
-    TaskContext context = new TaskContext(0, 0, 0, false, false, new 
TaskMetrics());
+    TaskContext context = new TaskContext(0, 0, 0, false, new TaskMetrics());
     Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), 
context).next().intValue());
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ee633210/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index fd5b090..4f178db 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -23,7 +23,6 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.mock.EasyMockSugar
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage._
 
 // TODO: Test the CacheManager's thread-safety aspects
@@ -59,8 +58,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter 
with EasyMockSugar
     }
 
     whenExecuting(blockManager) {
-      val context = new TaskContext(0, 0, 0, interrupted = false, 
runningLocally = false,
-        taskMetrics = TaskMetrics.empty)
+      val context = new TaskContext(0, 0, 0)
       val value = cacheManager.getOrCompute(rdd, split, context, 
StorageLevel.MEMORY_ONLY)
       assert(value.toList === List(1, 2, 3, 4))
     }
@@ -72,8 +70,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter 
with EasyMockSugar
     }
 
     whenExecuting(blockManager) {
-      val context = new TaskContext(0, 0, 0, interrupted = false, 
runningLocally = false,
-        taskMetrics = TaskMetrics.empty)
+      val context = new TaskContext(0, 0, 0)
       val value = cacheManager.getOrCompute(rdd, split, context, 
StorageLevel.MEMORY_ONLY)
       assert(value.toList === List(5, 6, 7))
     }
@@ -86,8 +83,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter 
with EasyMockSugar
     }
 
     whenExecuting(blockManager) {
-      val context = new TaskContext(0, 0, 0, runningLocally = true, 
interrupted = false,
-        taskMetrics = TaskMetrics.empty)
+      val context = new TaskContext(0, 0, 0, runningLocally = true)
       val value = cacheManager.getOrCompute(rdd, split, context, 
StorageLevel.MEMORY_ONLY)
       assert(value.toList === List(1, 2, 3, 4))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ee633210/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala 
b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
index 0bb6a6b..db56a4a 100644
--- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
@@ -178,14 +178,12 @@ class PipedRDDSuite extends FunSuite with 
SharedSparkContext {
       }
       val hadoopPart1 = generateFakeHadoopPartition()
       val pipedRdd = new PipedRDD(nums, "printenv " + varName)
-      val tContext = new TaskContext(0, 0, 0, interrupted = false, 
runningLocally = false,
-        taskMetrics = TaskMetrics.empty)
+      val tContext = new TaskContext(0, 0, 0)
       val rddIter = pipedRdd.compute(hadoopPart1, tContext)
       val arr = rddIter.toArray
       assert(arr(0) == "/some/path")
     } else {
       // printenv isn't available so just pass the test
-      assert(true)
     }
   }
 

Reply via email to