Repository: spark Updated Branches: refs/heads/branch-1.4 0be6e3b3e -> 130ec219a
[SPARK-7788] Made KinesisReceiver.onStart() non-blocking KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java. This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false. Author: Tathagata Das <[email protected]> Closes #6348 from tdas/SPARK-7788 and squashes the following commits: 2584683 [Tathagata Das] Added receiver id in thread name 6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking (cherry picked from commit 1c388a9985999e043fa002618a357bc8f0a8b65a) Signed-off-by: Tathagata Das <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/130ec219 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/130ec219 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/130ec219 Branch: refs/heads/branch-1.4 Commit: 130ec219aa40cd8cebf4105053d4c92d840e127e Parents: 0be6e3b Author: Tathagata Das <[email protected]> Authored: Fri May 22 17:39:01 2015 -0700 Committer: Tathagata Das <[email protected]> Committed: Fri May 22 17:39:09 2015 -0700 ---------------------------------------------------------------------- .../streaming/kinesis/KinesisReceiver.scala | 30 ++++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/130ec219/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 800202e..7dd8bfd 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -18,6 +18,8 @@ package org.apache.spark.streaming.kinesis import java.util.UUID +import scala.util.control.NonFatal + import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory} import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker} @@ -98,6 +100,9 @@ private[kinesis] class KinesisReceiver( */ private var worker: Worker = null + /** Thread running the worker */ + private var workerThread: Thread = null + /** * This is called when the KinesisReceiver starts and must be non-blocking. * The KCL creates and manages the receiving/processing thread pool through Worker.run(). @@ -126,8 +131,19 @@ private[kinesis] class KinesisReceiver( } worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration) - worker.run() - + workerThread = new Thread() { + override def run(): Unit = { + try { + worker.run() + } catch { + case NonFatal(e) => + restart("Error running the KCL worker in Receiver", e) + } + } + } + workerThread.setName(s"Kinesis Receiver ${streamId}") + workerThread.setDaemon(true) + workerThread.start() logInfo(s"Started receiver with workerId $workerId") } @@ -137,10 +153,14 @@ private[kinesis] class KinesisReceiver( * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown. */ override def onStop() { - if (worker != null) { - worker.shutdown() + if (workerThread != null) { + if (worker != null) { + worker.shutdown() + worker = null + } + workerThread.join() + workerThread = null logInfo(s"Stopped receiver for workerId $workerId") - worker = null } workerId = null } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
