Repository: spark
Updated Branches:
  refs/heads/master 883b7e903 -> acef51def


[SPARK-6537] UIWorkloadGenerator: The main thread should not stop SparkContext 
until all jobs finish

The main thread of UIWorkloadGenerator spawn sub threads to launch jobs but the 
main thread stop SparkContext without waiting for finishing those threads.

Author: Kousuke Saruta <[email protected]>

Closes #5187 from sarutak/SPARK-6537 and squashes the following commits:

4e9307a [Kousuke Saruta] Fixed UIWorkloadGenerator so that the main thread stop 
SparkContext after all jobs finish


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acef51de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acef51de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acef51de

Branch: refs/heads/master
Commit: acef51defb991bcdc99b76cf2a126afd6d60ec70
Parents: 883b7e9
Author: Kousuke Saruta <[email protected]>
Authored: Wed Mar 25 13:27:15 2015 -0700
Committer: Andrew Or <[email protected]>
Committed: Wed Mar 25 13:27:15 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ui/UIWorkloadGenerator.scala     | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/acef51de/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala 
b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 19ac7a8..5fbcd6b 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.ui
 
+import java.util.concurrent.Semaphore
+
 import scala.util.Random
 
 import org.apache.spark.{SparkConf, SparkContext}
@@ -88,6 +90,8 @@ private[spark] object UIWorkloadGenerator {
       ("Job with delays", baseData.map(x => Thread.sleep(100)).count)
     )
 
+    val barrier = new Semaphore(-nJobSet * jobs.size + 1)
+
     (1 to nJobSet).foreach { _ =>
       for ((desc, job) <- jobs) {
         new Thread {
@@ -99,12 +103,17 @@ private[spark] object UIWorkloadGenerator {
             } catch {
               case e: Exception =>
                 println("Job Failed: " + desc)
+            } finally {
+              barrier.release()
             }
           }
         }.start
         Thread.sleep(INTER_JOB_WAIT_MS)
       }
     }
+
+    // Waiting for threads.
+    barrier.acquire()
     sc.stop()
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to