Repository: spark
Updated Branches:
  refs/heads/branch-1.5 612b4609b -> 8d2624790


[SPARK-9968] [STREAMING] Reduced time spent within synchronized block to 
prevent lock starvation

When the rate limiter is actually limiting the rate at which data is inserted 
into the buffer, the synchronized block of BlockGenerator.addData stays blocked 
for long time. This causes the thread switching the buffer and generating 
blocks (synchronized with addData) to starve and not generate blocks for 
seconds. The correct solution is to not block on the rate limiter within the 
synchronized block for adding data to the buffer.

Author: Tathagata Das <[email protected]>

Closes #8204 from tdas/SPARK-9968 and squashes the following commits:

8cbcc1b [Tathagata Das] Removed unused val
a73b645 [Tathagata Das] Reduced time spent within synchronized block

(cherry picked from commit 18a761ef7a01a4dfa1dd91abe78cd68f2f8fdb67)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d262479
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d262479
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d262479

Branch: refs/heads/branch-1.5
Commit: 8d26247903a1b594df6e202f0834ed165f47bbdc
Parents: 612b460
Author: Tathagata Das <[email protected]>
Authored: Fri Aug 14 15:54:14 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Fri Aug 14 15:54:33 2015 -0700

----------------------------------------------------------------------
 .../streaming/receiver/BlockGenerator.scala     | 40 ++++++++++++++++----
 1 file changed, 32 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d262479/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 794dece..300e820 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -155,10 +155,17 @@ private[streaming] class BlockGenerator(
   /**
    * Push a single data item into the buffer.
    */
-  def addData(data: Any): Unit = synchronized {
+  def addData(data: Any): Unit = {
     if (state == Active) {
       waitToPush()
-      currentBuffer += data
+      synchronized {
+        if (state == Active) {
+          currentBuffer += data
+        } else {
+          throw new SparkException(
+            "Cannot add data as BlockGenerator has not been started or has 
been stopped")
+        }
+      }
     } else {
       throw new SparkException(
         "Cannot add data as BlockGenerator has not been started or has been 
stopped")
@@ -169,11 +176,18 @@ private[streaming] class BlockGenerator(
    * Push a single data item into the buffer. After buffering the data, the
    * `BlockGeneratorListener.onAddData` callback will be called.
    */
-  def addDataWithCallback(data: Any, metadata: Any): Unit = synchronized {
+  def addDataWithCallback(data: Any, metadata: Any): Unit = {
     if (state == Active) {
       waitToPush()
-      currentBuffer += data
-      listener.onAddData(data, metadata)
+      synchronized {
+        if (state == Active) {
+          currentBuffer += data
+          listener.onAddData(data, metadata)
+        } else {
+          throw new SparkException(
+            "Cannot add data as BlockGenerator has not been started or has 
been stopped")
+        }
+      }
     } else {
       throw new SparkException(
         "Cannot add data as BlockGenerator has not been started or has been 
stopped")
@@ -185,13 +199,23 @@ private[streaming] class BlockGenerator(
    * `BlockGeneratorListener.onAddData` callback will be called. Note that all 
the data items
    * are atomically added to the buffer, and are hence guaranteed to be 
present in a single block.
    */
-  def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): 
Unit = synchronized {
+  def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): 
Unit = {
     if (state == Active) {
+      // Unroll iterator into a temp buffer, and wait for pushing in the 
process
+      val tempBuffer = new ArrayBuffer[Any]
       dataIterator.foreach { data =>
         waitToPush()
-        currentBuffer += data
+        tempBuffer += data
+      }
+      synchronized {
+        if (state == Active) {
+          currentBuffer ++= tempBuffer
+          listener.onAddData(tempBuffer, metadata)
+        } else {
+          throw new SparkException(
+            "Cannot add data as BlockGenerator has not been started or has 
been stopped")
+        }
       }
-      listener.onAddData(dataIterator, metadata)
     } else {
       throw new SparkException(
         "Cannot add data as BlockGenerator has not been started or has been 
stopped")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to