Repository: spark Updated Branches: refs/heads/master 883b7e903 -> acef51def
[SPARK-6537] UIWorkloadGenerator: The main thread should not stop SparkContext until all jobs finish The main thread of UIWorkloadGenerator spawn sub threads to launch jobs but the main thread stop SparkContext without waiting for finishing those threads. Author: Kousuke Saruta <[email protected]> Closes #5187 from sarutak/SPARK-6537 and squashes the following commits: 4e9307a [Kousuke Saruta] Fixed UIWorkloadGenerator so that the main thread stop SparkContext after all jobs finish Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acef51de Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acef51de Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acef51de Branch: refs/heads/master Commit: acef51defb991bcdc99b76cf2a126afd6d60ec70 Parents: 883b7e9 Author: Kousuke Saruta <[email protected]> Authored: Wed Mar 25 13:27:15 2015 -0700 Committer: Andrew Or <[email protected]> Committed: Wed Mar 25 13:27:15 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/ui/UIWorkloadGenerator.scala | 9 +++++++++ 1 file changed, 9 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/acef51de/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 19ac7a8..5fbcd6b 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -17,6 +17,8 @@ package org.apache.spark.ui +import java.util.concurrent.Semaphore + import scala.util.Random import org.apache.spark.{SparkConf, SparkContext} @@ -88,6 +90,8 @@ private[spark] object UIWorkloadGenerator { ("Job with delays", baseData.map(x => Thread.sleep(100)).count) ) + val barrier = new Semaphore(-nJobSet * jobs.size + 1) + (1 to nJobSet).foreach { _ => for ((desc, job) <- jobs) { new Thread { @@ -99,12 +103,17 @@ private[spark] object UIWorkloadGenerator { } catch { case e: Exception => println("Job Failed: " + desc) + } finally { + barrier.release() } } }.start Thread.sleep(INTER_JOB_WAIT_MS) } } + + // Waiting for threads. + barrier.acquire() sc.stop() } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
