Repository: spark
Updated Branches:
  refs/heads/branch-1.5 648074096 -> 5186ec8ac


[SPARK-11063] [STREAMING] Change preferredLocations of Receiver's RDD to hosts 
rather than hostports

The format of RDD's preferredLocations must be hostname but the format of 
Streaming Receiver's scheduling executors is hostport. So it doesn't work.

This PR converts `schedulerExecutors` to `hosts` before creating Receiver's RDD.

Author: zsxwing <[email protected]>

Closes #9075 from zsxwing/SPARK-11063.

(cherry picked from commit 67582132bffbaaeaadc5cf8218f6239d03c39da0)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5186ec8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5186ec8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5186ec8a

Branch: refs/heads/branch-1.5
Commit: 5186ec8aca53ffdffbd41599b9fe1f3c5902de01
Parents: 6480740
Author: zsxwing <[email protected]>
Authored: Mon Oct 19 15:35:14 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Mon Oct 19 15:35:46 2015 -0700

----------------------------------------------------------------------
 .../scheduler/ReceiverSchedulingPolicy.scala    |  3 ++-
 .../streaming/scheduler/ReceiverTracker.scala   |  4 +++-
 .../scheduler/ReceiverTrackerSuite.scala        | 24 ++++++++++++++++++++
 3 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5186ec8a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index 10b5a7f..d2b0be7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -21,6 +21,7 @@ import scala.collection.Map
 import scala.collection.mutable
 
 import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
 
 /**
  * A class that tries to schedule receivers with evenly distributed. There are 
two phases for
@@ -79,7 +80,7 @@ private[streaming] class ReceiverSchedulingPolicy {
       return receivers.map(_.streamId -> Seq.empty).toMap
     }
 
-    val hostToExecutors = executors.groupBy(_.split(":")(0))
+    val hostToExecutors = executors.groupBy(executor => 
Utils.parseHostPort(executor)._1)
     val scheduledExecutors = Array.fill(receivers.length)(new 
mutable.ArrayBuffer[String])
     val numReceiversOnExecutor = mutable.HashMap[String, Int]()
     // Set the initial value to 0

http://git-wip-us.apache.org/repos/asf/spark/blob/5186ec8a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 204e614..7b8b68a 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -551,7 +551,9 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
         if (scheduledExecutors.isEmpty) {
           ssc.sc.makeRDD(Seq(receiver), 1)
         } else {
-          ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors))
+          val preferredLocations =
+            scheduledExecutors.map(hostPort => 
Utils.parseHostPort(hostPort)._1).distinct
+          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
         }
       receiverRDD.setName(s"Receiver $receiverId")
       ssc.sparkContext.setJobDescription(s"Streaming job running receiver 
$receiverId")

http://git-wip-us.apache.org/repos/asf/spark/blob/5186ec8a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index 45138b7..fda86ae 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, 
TaskLocality}
+import org.apache.spark.scheduler.TaskLocality.TaskLocality
 import org.apache.spark.storage.{StorageLevel, StreamBlockId}
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
@@ -80,6 +82,28 @@ class ReceiverTrackerSuite extends TestSuiteBase {
       }
     }
   }
+
+  test("SPARK-11063: TaskSetManager should use Receiver RDD's 
preferredLocations") {
+    // Use ManualClock to prevent from starting batches so that we can make 
sure the only task is
+    // for starting the Receiver
+    val _conf = conf.clone.set("spark.streaming.clock", 
"org.apache.spark.util.ManualClock")
+    withStreamingContext(new StreamingContext(_conf, Milliseconds(100))) { ssc 
=>
+      @volatile var receiverTaskLocality: TaskLocality = null
+      ssc.sparkContext.addSparkListener(new SparkListener {
+        override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+          receiverTaskLocality = taskStart.taskInfo.taskLocality
+        }
+      })
+      val input = ssc.receiverStream(new TestReceiver)
+      val output = new TestOutputStream(input)
+      output.register()
+      ssc.start()
+      eventually(timeout(10 seconds), interval(10 millis)) {
+        // If preferredLocations is set correctly, receiverTaskLocality should 
be NODE_LOCAL
+        assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
+      }
+    }
+  }
 }
 
 /** An input DStream with for testing rate controlling */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to