Repository: spark
Updated Branches:
  refs/heads/branch-1.0 d1e22b386 -> 26f6b9893


[STREAMING] SPARK-2009 Key not found exception when slow receiver starts

I got "java.util.NoSuchElementException: key not found: 1401756085000 ms" 
exception when using kafka stream and 1 sec batchPeriod.

Investigation showed that the reason is that ReceiverLauncher.startReceivers is 
asynchronous (started in a thread).
https://github.com/vchekan/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L206

In case of slow starting receiver, such as Kafka, it easily takes more than 
2sec to start. In result, no single "compute" will be called on 
ReceiverInputDStream before first batch job is executed and receivedBlockInfo 
remains empty (obviously). Batch job will cause 
ReceiverInputDStream.getReceivedBlockInfo call and "key not found" exception.

The patch makes getReceivedBlockInfo more robust by tolerating missing values.

Author: Vadim Chekan <[email protected]>

Closes #961 from vchekan/branch-1.0 and squashes the following commits:

e86f82b [Vadim Chekan] Fixed indentation
4609563 [Vadim Chekan] Key not found exception: if receiver is slow to start, 
it is possible that getReceivedBlockInfo will be called before compute has been 
called


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26f6b989
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26f6b989
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26f6b989

Branch: refs/heads/branch-1.0
Commit: 26f6b989312a9a48a27a23ecc68702bd14032e55
Parents: d1e22b3
Author: Vadim Chekan <[email protected]>
Authored: Tue Jun 17 22:03:50 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Tue Jun 17 22:03:50 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/dstream/ReceiverInputDStream.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26f6b989/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 75cabdb..391e409 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -74,7 +74,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient 
ssc_ : StreamingCont
 
   /** Get information on received blocks. */
   private[streaming] def getReceivedBlockInfo(time: Time) = {
-    receivedBlockInfo(time)
+    receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
   }
 
   /**

Reply via email to