http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
deleted file mode 100644
index d4881b1..0000000
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.collection.mutable.ArrayBuffer
-import scala.reflect.{classTag, ClassTag}
-
-import kafka.api.{FetchRequestBuilder, FetchResponse}
-import kafka.common.{ErrorMapping, TopicAndPartition}
-import kafka.consumer.SimpleConsumer
-import kafka.message.{MessageAndMetadata, MessageAndOffset}
-import kafka.serializer.Decoder
-import kafka.utils.VerifiableProperties
-
-import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
-import org.apache.spark.internal.Logging
-import org.apache.spark.partial.{BoundedDouble, PartialResult}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.util.NextIterator
-
-/**
- * A batch-oriented interface for consuming from Kafka.
- * Starting and ending offsets are specified in advance,
- * so that you can control exactly-once semantics.
- * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
- * configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers" to be set
- * with Kafka broker(s) specified in host1:port1,host2:port2 form.
- * @param offsetRanges offset ranges that define the Kafka data belonging to 
this RDD
- * @param messageHandler function for translating each message into the 
desired type
- */
-private[kafka]
-class KafkaRDD[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[_]: ClassTag,
-  T <: Decoder[_]: ClassTag,
-  R: ClassTag] private[spark] (
-    sc: SparkContext,
-    kafkaParams: Map[String, String],
-    val offsetRanges: Array[OffsetRange],
-    leaders: Map[TopicAndPartition, (String, Int)],
-    messageHandler: MessageAndMetadata[K, V] => R
-  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
-  override def getPartitions: Array[Partition] = {
-    offsetRanges.zipWithIndex.map { case (o, i) =>
-        val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
-        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset, host, port)
-    }.toArray
-  }
-
-  override def count(): Long = offsetRanges.map(_.count).sum
-
-  override def countApprox(
-      timeout: Long,
-      confidence: Double = 0.95
-  ): PartialResult[BoundedDouble] = {
-    val c = count
-    new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
-  }
-
-  override def isEmpty(): Boolean = count == 0L
-
-  override def take(num: Int): Array[R] = {
-    val nonEmptyPartitions = this.partitions
-      .map(_.asInstanceOf[KafkaRDDPartition])
-      .filter(_.count > 0)
-
-    if (num < 1 || nonEmptyPartitions.isEmpty) {
-      return new Array[R](0)
-    }
-
-    // Determine in advance how many messages need to be taken from each 
partition
-    val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) 
=>
-      val remain = num - result.values.sum
-      if (remain > 0) {
-        val taken = Math.min(remain, part.count)
-        result + (part.index -> taken.toInt)
-      } else {
-        result
-      }
-    }
-
-    val buf = new ArrayBuffer[R]
-    val res = context.runJob(
-      this,
-      (tc: TaskContext, it: Iterator[R]) => 
it.take(parts(tc.partitionId)).toArray,
-      parts.keys.toArray)
-    res.foreach(buf ++= _)
-    buf.toArray
-  }
-
-  override def getPreferredLocations(thePart: Partition): Seq[String] = {
-    val part = thePart.asInstanceOf[KafkaRDDPartition]
-    // TODO is additional hostname resolution necessary here
-    Seq(part.host)
-  }
-
-  private def errBeginAfterEnd(part: KafkaRDDPartition): String =
-    s"Beginning offset ${part.fromOffset} is after the ending offset 
${part.untilOffset} " +
-      s"for topic ${part.topic} partition ${part.partition}. " +
-      "You either provided an invalid fromOffset, or the Kafka topic has been 
damaged"
-
-  private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
-    s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
-    s"for topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}." +
-    " This should not happen, and indicates that messages may have been lost"
-
-  private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): 
String =
-    s"Got ${itemOffset} > ending offset ${part.untilOffset} " +
-    s"for topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}." +
-    " This should not happen, and indicates a message may have been skipped"
-
-  override def compute(thePart: Partition, context: TaskContext): Iterator[R] 
= {
-    val part = thePart.asInstanceOf[KafkaRDDPartition]
-    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
-    if (part.fromOffset == part.untilOffset) {
-      log.info(s"Beginning offset ${part.fromOffset} is the same as ending 
offset " +
-        s"skipping ${part.topic} ${part.partition}")
-      Iterator.empty
-    } else {
-      new KafkaRDDIterator(part, context)
-    }
-  }
-
-  private class KafkaRDDIterator(
-      part: KafkaRDDPartition,
-      context: TaskContext) extends NextIterator[R] {
-
-    context.addTaskCompletionListener{ context => closeIfNeeded() }
-
-    log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
-      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
-    val kc = new KafkaCluster(kafkaParams)
-    val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(kc.config.props)
-      .asInstanceOf[Decoder[K]]
-    val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(kc.config.props)
-      .asInstanceOf[Decoder[V]]
-    val consumer = connectLeader
-    var requestOffset = part.fromOffset
-    var iter: Iterator[MessageAndOffset] = null
-
-    // The idea is to use the provided preferred host, except on task retry 
attempts,
-    // to minimize number of kafka metadata requests
-    private def connectLeader: SimpleConsumer = {
-      if (context.attemptNumber > 0) {
-        kc.connectLeader(part.topic, part.partition).fold(
-          errs => throw new SparkException(
-            s"Couldn't connect to leader for topic ${part.topic} 
${part.partition}: " +
-              errs.mkString("\n")),
-          consumer => consumer
-        )
-      } else {
-        kc.connect(part.host, part.port)
-      }
-    }
-
-    private def handleFetchErr(resp: FetchResponse) {
-      if (resp.hasError) {
-        val err = resp.errorCode(part.topic, part.partition)
-        if (err == ErrorMapping.LeaderNotAvailableCode ||
-          err == ErrorMapping.NotLeaderForPartitionCode) {
-          log.error(s"Lost leader for topic ${part.topic} partition 
${part.partition}, " +
-            s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
-          Thread.sleep(kc.config.refreshLeaderBackoffMs)
-        }
-        // Let normal rdd retry sort out reconnect attempts
-        throw ErrorMapping.exceptionFor(err)
-      }
-    }
-
-    private def fetchBatch: Iterator[MessageAndOffset] = {
-      val req = new FetchRequestBuilder()
-        .addFetch(part.topic, part.partition, requestOffset, 
kc.config.fetchMessageMaxBytes)
-        .build()
-      val resp = consumer.fetch(req)
-      handleFetchErr(resp)
-      // kafka may return a batch that starts before the requested offset
-      resp.messageSet(part.topic, part.partition)
-        .iterator
-        .dropWhile(_.offset < requestOffset)
-    }
-
-    override def close(): Unit = {
-      if (consumer != null) {
-        consumer.close()
-      }
-    }
-
-    override def getNext(): R = {
-      if (iter == null || !iter.hasNext) {
-        iter = fetchBatch
-      }
-      if (!iter.hasNext) {
-        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
-        finished = true
-        null.asInstanceOf[R]
-      } else {
-        val item = iter.next()
-        if (item.offset >= part.untilOffset) {
-          assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, 
part))
-          finished = true
-          null.asInstanceOf[R]
-        } else {
-          requestOffset = item.nextOffset
-          messageHandler(new MessageAndMetadata(
-            part.topic, part.partition, item.message, item.offset, keyDecoder, 
valueDecoder))
-        }
-      }
-    }
-  }
-}
-
-private[kafka]
-object KafkaRDD {
-  import KafkaCluster.LeaderOffset
-
-  /**
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   * configuration parameters</a>.
-   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with 
Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
-   * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
-   *  starting point of the batch
-   * @param untilOffsets per-topic/partition Kafka offsets defining the 
(exclusive)
-   *  ending point of the batch
-   * @param messageHandler function for translating each message into the 
desired type
-   */
-  def apply[
-    K: ClassTag,
-    V: ClassTag,
-    U <: Decoder[_]: ClassTag,
-    T <: Decoder[_]: ClassTag,
-    R: ClassTag](
-      sc: SparkContext,
-      kafkaParams: Map[String, String],
-      fromOffsets: Map[TopicAndPartition, Long],
-      untilOffsets: Map[TopicAndPartition, LeaderOffset],
-      messageHandler: MessageAndMetadata[K, V] => R
-    ): KafkaRDD[K, V, U, T, R] = {
-    val leaders = untilOffsets.map { case (tp, lo) =>
-        tp -> (lo.host, lo.port)
-    }.toMap
-
-    val offsetRanges = fromOffsets.map { case (tp, fo) =>
-        val uo = untilOffsets(tp)
-        OffsetRange(tp.topic, tp.partition, fo, uo.offset)
-    }.toArray
-
-    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, 
messageHandler)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
deleted file mode 100644
index 02917be..0000000
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import org.apache.spark.Partition
-
-/**
- * @param topic kafka topic name
- * @param partition kafka partition id
- * @param fromOffset inclusive starting offset
- * @param untilOffset exclusive ending offset
- * @param host preferred kafka host, i.e. the leader at the time the rdd was 
created
- * @param port preferred kafka host's port
- */
-private[kafka]
-class KafkaRDDPartition(
-  val index: Int,
-  val topic: String,
-  val partition: Int,
-  val fromOffset: Long,
-  val untilOffset: Long,
-  val host: String,
-  val port: Int
-) extends Partition {
-  /** Number of messages this partition refers to */
-  def count(): Long = untilOffset - fromOffset
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
deleted file mode 100644
index d9d4240..0000000
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.File
-import java.lang.{Integer => JInt}
-import java.net.InetSocketAddress
-import java.util.{Map => JMap, Properties}
-import java.util.concurrent.TimeoutException
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.language.postfixOps
-import scala.util.control.NonFatal
-
-import kafka.admin.AdminUtils
-import kafka.api.Request
-import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-import kafka.serializer.StringEncoder
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{ZKStringSerializer, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
-import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.Time
-import org.apache.spark.util.Utils
-
-/**
- * This is a helper class for Kafka test suites. This has the functionality to 
set up
- * and tear down local Kafka servers, and to push data using Kafka producers.
- *
- * The reason to put Kafka test utility class in src is to test Python related 
Kafka APIs.
- */
-private[kafka] class KafkaTestUtils extends Logging {
-
-  // Zookeeper related configurations
-  private val zkHost = "localhost"
-  private var zkPort: Int = 0
-  private val zkConnectionTimeout = 60000
-  private val zkSessionTimeout = 6000
-
-  private var zookeeper: EmbeddedZookeeper = _
-
-  private var zkClient: ZkClient = _
-
-  // Kafka broker related configurations
-  private val brokerHost = "localhost"
-  private var brokerPort = 9092
-  private var brokerConf: KafkaConfig = _
-
-  // Kafka broker server
-  private var server: KafkaServer = _
-
-  // Kafka producer
-  private var producer: Producer[String, String] = _
-
-  // Flag to test whether the system is correctly started
-  private var zkReady = false
-  private var brokerReady = false
-
-  def zkAddress: String = {
-    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper address")
-    s"$zkHost:$zkPort"
-  }
-
-  def brokerAddress: String = {
-    assert(brokerReady, "Kafka not setup yet or already torn down, cannot get 
broker address")
-    s"$brokerHost:$brokerPort"
-  }
-
-  def zookeeperClient: ZkClient = {
-    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper client")
-    Option(zkClient).getOrElse(
-      throw new IllegalStateException("Zookeeper client is not yet 
initialized"))
-  }
-
-  // Set up the Embedded Zookeeper server and get the proper Zookeeper port
-  private def setupEmbeddedZookeeper(): Unit = {
-    // Zookeeper server startup
-    zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
-    // Get the actual zookeeper binding port
-    zkPort = zookeeper.actualPort
-    zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, 
zkConnectionTimeout,
-      ZKStringSerializer)
-    zkReady = true
-  }
-
-  // Set up the Embedded Kafka server
-  private def setupEmbeddedKafkaServer(): Unit = {
-    assert(zkReady, "Zookeeper should be set up beforehand")
-
-    // Kafka broker startup
-    Utils.startServiceOnPort(brokerPort, port => {
-      brokerPort = port
-      brokerConf = new KafkaConfig(brokerConfiguration)
-      server = new KafkaServer(brokerConf)
-      server.startup()
-      (server, port)
-    }, new SparkConf(), "KafkaBroker")
-
-    brokerReady = true
-  }
-
-  /** setup the whole embedded servers, including Zookeeper and Kafka brokers 
*/
-  def setup(): Unit = {
-    setupEmbeddedZookeeper()
-    setupEmbeddedKafkaServer()
-  }
-
-  /** Teardown the whole servers, including Kafka broker and Zookeeper */
-  def teardown(): Unit = {
-    brokerReady = false
-    zkReady = false
-
-    if (producer != null) {
-      producer.close()
-      producer = null
-    }
-
-    if (server != null) {
-      server.shutdown()
-      server = null
-    }
-
-    brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
-
-    if (zkClient != null) {
-      zkClient.close()
-      zkClient = null
-    }
-
-    if (zookeeper != null) {
-      zookeeper.shutdown()
-      zookeeper = null
-    }
-  }
-
-  /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
-  def createTopic(topic: String, partitions: Int): Unit = {
-    AdminUtils.createTopic(zkClient, topic, partitions, 1)
-    // wait until metadata is propagated
-    (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, 
p) }
-  }
-
-  /** Single-argument version for backwards compatibility */
-  def createTopic(topic: String): Unit = createTopic(topic, 1)
-
-  /** Java-friendly function for sending messages to the Kafka broker */
-  def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
-    sendMessages(topic, 
Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
-  }
-
-  /** Send the messages to the Kafka broker */
-  def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
-    val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) 
}.toArray
-    sendMessages(topic, messages)
-  }
-
-  /** Send the array of messages to the Kafka broker */
-  def sendMessages(topic: String, messages: Array[String]): Unit = {
-    producer = new Producer[String, String](new 
ProducerConfig(producerConfiguration))
-    producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) 
}: _*)
-    producer.close()
-    producer = null
-  }
-
-  private def brokerConfiguration: Properties = {
-    val props = new Properties()
-    props.put("broker.id", "0")
-    props.put("host.name", "localhost")
-    props.put("port", brokerPort.toString)
-    props.put("log.dir", Utils.createTempDir().getAbsolutePath)
-    props.put("zookeeper.connect", zkAddress)
-    props.put("log.flush.interval.messages", "1")
-    props.put("replica.socket.timeout.ms", "1500")
-    props
-  }
-
-  private def producerConfiguration: Properties = {
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerAddress)
-    props.put("serializer.class", classOf[StringEncoder].getName)
-    // wait for all in-sync replicas to ack sends
-    props.put("request.required.acks", "-1")
-    props
-  }
-
-  // A simplified version of scalatest eventually, rewritten here to avoid 
adding extra test
-  // dependency
-  def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
-    def makeAttempt(): Either[Throwable, T] = {
-      try {
-        Right(func)
-      } catch {
-        case e if NonFatal(e) => Left(e)
-      }
-    }
-
-    val startTime = System.currentTimeMillis()
-    @tailrec
-    def tryAgain(attempt: Int): T = {
-      makeAttempt() match {
-        case Right(result) => result
-        case Left(e) =>
-          val duration = System.currentTimeMillis() - startTime
-          if (duration < timeout.milliseconds) {
-            Thread.sleep(interval.milliseconds)
-          } else {
-            throw new TimeoutException(e.getMessage)
-          }
-
-          tryAgain(attempt + 1)
-      }
-    }
-
-    tryAgain(1)
-  }
-
-  private def waitUntilMetadataIsPropagated(topic: String, partition: Int): 
Unit = {
-    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, 
partition) match {
-      case Some(partitionState) =>
-        val leaderAndInSyncReplicas = 
partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
-
-        ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
-          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
-          leaderAndInSyncReplicas.isr.size >= 1
-
-      case _ =>
-        false
-    }
-    eventually(Time(10000), Time(100)) {
-      assert(isPropagated, s"Partition [$topic, $partition] metadata not 
propagated after timeout")
-    }
-  }
-
-  private class EmbeddedZookeeper(val zkConnect: String) {
-    val snapshotDir = Utils.createTempDir()
-    val logDir = Utils.createTempDir()
-
-    val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
-    val (ip, port) = {
-      val splits = zkConnect.split(":")
-      (splits(0), splits(1).toInt)
-    }
-    val factory = new NIOServerCnxnFactory()
-    factory.configure(new InetSocketAddress(ip, port), 16)
-    factory.startup(zookeeper)
-
-    val actualPort = factory.getLocalPort
-
-    def shutdown() {
-      factory.shutdown()
-      Utils.deleteRecursively(snapshotDir)
-      Utils.deleteRecursively(logDir)
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
deleted file mode 100644
index edaafb9..0000000
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ /dev/null
@@ -1,805 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.OutputStream
-import java.lang.{Integer => JInt, Long => JLong}
-import java.nio.charset.StandardCharsets
-import java.util.{List => JList, Map => JMap, Set => JSet}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder}
-import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
-
-import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
-import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.api.python.SerDeUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java._
-import org.apache.spark.streaming.dstream.{DStream, InputDStream, 
ReceiverInputDStream}
-import org.apache.spark.streaming.util.WriteAheadLogUtils
-
-object KafkaUtils {
-  /**
-   * Create an input stream that pulls messages from Kafka Brokers.
-   * @param ssc       StreamingContext object
-   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
-   * @param groupId   The group id for this consumer
-   * @param topics    Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
-   *                  in its own thread
-   * @param storageLevel  Storage level to use for storing the received objects
-   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createStream(
-      ssc: StreamingContext,
-      zkQuorum: String,
-      groupId: String,
-      topics: Map[String, Int],
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): ReceiverInputDStream[(String, String)] = {
-    val kafkaParams = Map[String, String](
-      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
-      "zookeeper.connection.timeout.ms" -> "10000")
-    createStream[String, String, StringDecoder, StringDecoder](
-      ssc, kafkaParams, topics, storageLevel)
-  }
-
-  /**
-   * Create an input stream that pulls messages from Kafka Brokers.
-   * @param ssc         StreamingContext object
-   * @param kafkaParams Map of kafka configuration parameters,
-   *                    see http://kafka.apache.org/08/configuration.html
-   * @param topics      Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
-   *                    in its own thread.
-   * @param storageLevel Storage level to use for storing the received objects
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam U type of Kafka message key decoder
-   * @tparam T type of Kafka message value decoder
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: 
Decoder[_]: ClassTag](
-      ssc: StreamingContext,
-      kafkaParams: Map[String, String],
-      topics: Map[String, Int],
-      storageLevel: StorageLevel
-    ): ReceiverInputDStream[(K, V)] = {
-    val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
-    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, 
storageLevel)
-  }
-
-  /**
-   * Create an input stream that pulls messages from Kafka Brokers.
-   * Storage level of the data will be the default 
StorageLevel.MEMORY_AND_DISK_SER_2.
-   * @param jssc      JavaStreamingContext object
-   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
-   * @param groupId   The group id for this consumer
-   * @param topics    Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
-   *                  in its own thread
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      zkQuorum: String,
-      groupId: String,
-      topics: JMap[String, JInt]
-    ): JavaPairReceiverInputDStream[String, String] = {
-    createStream(jssc.ssc, zkQuorum, groupId, 
Map(topics.asScala.mapValues(_.intValue()).toSeq: _*))
-  }
-
-  /**
-   * Create an input stream that pulls messages from Kafka Brokers.
-   * @param jssc      JavaStreamingContext object
-   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..).
-   * @param groupId   The group id for this consumer.
-   * @param topics    Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
-   *                  in its own thread.
-   * @param storageLevel RDD storage level.
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      zkQuorum: String,
-      groupId: String,
-      topics: JMap[String, JInt],
-      storageLevel: StorageLevel
-    ): JavaPairReceiverInputDStream[String, String] = {
-    createStream(jssc.ssc, zkQuorum, groupId, 
Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
-      storageLevel)
-  }
-
-  /**
-   * Create an input stream that pulls messages from Kafka Brokers.
-   * @param jssc      JavaStreamingContext object
-   * @param keyTypeClass Key type of DStream
-   * @param valueTypeClass value type of Dstream
-   * @param keyDecoderClass Type of kafka key decoder
-   * @param valueDecoderClass Type of kafka value decoder
-   * @param kafkaParams Map of kafka configuration parameters,
-   *                    see http://kafka.apache.org/08/configuration.html
-   * @param topics  Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
-   *                in its own thread
-   * @param storageLevel RDD storage level.
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam U type of Kafka message key decoder
-   * @tparam T type of Kafka message value decoder
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
-      jssc: JavaStreamingContext,
-      keyTypeClass: Class[K],
-      valueTypeClass: Class[V],
-      keyDecoderClass: Class[U],
-      valueDecoderClass: Class[T],
-      kafkaParams: JMap[String, String],
-      topics: JMap[String, JInt],
-      storageLevel: StorageLevel
-    ): JavaPairReceiverInputDStream[K, V] = {
-    implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
-    implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
-
-    implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
-    implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
-
-    createStream[K, V, U, T](
-      jssc.ssc,
-      kafkaParams.asScala.toMap,
-      Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
-      storageLevel)
-  }
-
-  /** get leaders for the given offset ranges, or throw an exception */
-  private def leadersForRanges(
-      kc: KafkaCluster,
-      offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] 
= {
-    val topics = offsetRanges.map(o => TopicAndPartition(o.topic, 
o.partition)).toSet
-    val leaders = kc.findLeaders(topics)
-    KafkaCluster.checkErrors(leaders)
-  }
-
-  /** Make sure offsets are available in kafka, or throw an exception */
-  private def checkOffsets(
-      kc: KafkaCluster,
-      offsetRanges: Array[OffsetRange]): Unit = {
-    val topics = offsetRanges.map(_.topicAndPartition).toSet
-    val result = for {
-      low <- kc.getEarliestLeaderOffsets(topics).right
-      high <- kc.getLatestLeaderOffsets(topics).right
-    } yield {
-      offsetRanges.filterNot { o =>
-        low(o.topicAndPartition).offset <= o.fromOffset &&
-        o.untilOffset <= high(o.topicAndPartition).offset
-      }
-    }
-    val badRanges = KafkaCluster.checkErrors(result)
-    if (!badRanges.isEmpty) {
-      throw new SparkException("Offsets not available on leader: " + 
badRanges.mkString(","))
-    }
-  }
-
-  private[kafka] def getFromOffsets(
-      kc: KafkaCluster,
-      kafkaParams: Map[String, String],
-      topics: Set[String]
-    ): Map[TopicAndPartition, Long] = {
-    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
-    val result = for {
-      topicPartitions <- kc.getPartitions(topics).right
-      leaderOffsets <- (if (reset == Some("smallest")) {
-        kc.getEarliestLeaderOffsets(topicPartitions)
-      } else {
-        kc.getLatestLeaderOffsets(topicPartitions)
-      }).right
-    } yield {
-      leaderOffsets.map { case (tp, lo) =>
-          (tp, lo.offset)
-      }
-    }
-    KafkaCluster.checkErrors(result)
-  }
-
-  /**
-   * Create a RDD from Kafka using offset ranges for each topic and partition.
-   *
-   * @param sc SparkContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
-   *    host1:port1,host2:port2 form.
-   * @param offsetRanges Each OffsetRange in the batch corresponds to a
-   *   range of offsets for a given Kafka topic/partition
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @return RDD of (Kafka message key, Kafka message value)
-   */
-  def createRDD[
-    K: ClassTag,
-    V: ClassTag,
-    KD <: Decoder[K]: ClassTag,
-    VD <: Decoder[V]: ClassTag](
-      sc: SparkContext,
-      kafkaParams: Map[String, String],
-      offsetRanges: Array[OffsetRange]
-    ): RDD[(K, V)] = sc.withScope {
-    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, 
mmd.message)
-    val kc = new KafkaCluster(kafkaParams)
-    val leaders = leadersForRanges(kc, offsetRanges)
-    checkOffsets(kc, offsetRanges)
-    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, 
messageHandler)
-  }
-
-  /**
-   * Create a RDD from Kafka using offset ranges for each topic and partition. 
This allows you
-   * specify the Kafka leader to connect to (to optimize fetching) and access 
the message as well
-   * as the metadata.
-   *
-   * @param sc SparkContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
-   *    host1:port1,host2:port2 form.
-   * @param offsetRanges Each OffsetRange in the batch corresponds to a
-   *   range of offsets for a given Kafka topic/partition
-   * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges.  
May be an empty map,
-   *   in which case leaders will be looked up on the driver.
-   * @param messageHandler Function for translating each message and metadata 
into the desired type
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @tparam R type returned by messageHandler
-   * @return RDD of R
-   */
-  def createRDD[
-    K: ClassTag,
-    V: ClassTag,
-    KD <: Decoder[K]: ClassTag,
-    VD <: Decoder[V]: ClassTag,
-    R: ClassTag](
-      sc: SparkContext,
-      kafkaParams: Map[String, String],
-      offsetRanges: Array[OffsetRange],
-      leaders: Map[TopicAndPartition, Broker],
-      messageHandler: MessageAndMetadata[K, V] => R
-    ): RDD[R] = sc.withScope {
-    val kc = new KafkaCluster(kafkaParams)
-    val leaderMap = if (leaders.isEmpty) {
-      leadersForRanges(kc, offsetRanges)
-    } else {
-      // This could be avoided by refactoring KafkaRDD.leaders and 
KafkaCluster to use Broker
-      leaders.map {
-        case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
-      }
-    }
-    val cleanedHandler = sc.clean(messageHandler)
-    checkOffsets(kc, offsetRanges)
-    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, 
cleanedHandler)
-  }
-
-  /**
-   * Create a RDD from Kafka using offset ranges for each topic and partition.
-   *
-   * @param jsc JavaSparkContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
-   *    host1:port1,host2:port2 form.
-   * @param offsetRanges Each OffsetRange in the batch corresponds to a
-   *   range of offsets for a given Kafka topic/partition
-   * @param keyClass type of Kafka message key
-   * @param valueClass type of Kafka message value
-   * @param keyDecoderClass type of Kafka message key decoder
-   * @param valueDecoderClass type of Kafka message value decoder
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @return RDD of (Kafka message key, Kafka message value)
-   */
-  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
-      jsc: JavaSparkContext,
-      keyClass: Class[K],
-      valueClass: Class[V],
-      keyDecoderClass: Class[KD],
-      valueDecoderClass: Class[VD],
-      kafkaParams: JMap[String, String],
-      offsetRanges: Array[OffsetRange]
-    ): JavaPairRDD[K, V] = jsc.sc.withScope {
-    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
-    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
-    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
-    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
-    new JavaPairRDD(createRDD[K, V, KD, VD](
-      jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges))
-  }
-
-  /**
-   * Create a RDD from Kafka using offset ranges for each topic and partition. 
This allows you
-   * specify the Kafka leader to connect to (to optimize fetching) and access 
the message as well
-   * as the metadata.
-   *
-   * @param jsc JavaSparkContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
-   *    host1:port1,host2:port2 form.
-   * @param offsetRanges Each OffsetRange in the batch corresponds to a
-   *   range of offsets for a given Kafka topic/partition
-   * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges.  
May be an empty map,
-   *   in which case leaders will be looked up on the driver.
-   * @param messageHandler Function for translating each message and metadata 
into the desired type
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @tparam R type returned by messageHandler
-   * @return RDD of R
-   */
-  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
-      jsc: JavaSparkContext,
-      keyClass: Class[K],
-      valueClass: Class[V],
-      keyDecoderClass: Class[KD],
-      valueDecoderClass: Class[VD],
-      recordClass: Class[R],
-      kafkaParams: JMap[String, String],
-      offsetRanges: Array[OffsetRange],
-      leaders: JMap[TopicAndPartition, Broker],
-      messageHandler: JFunction[MessageAndMetadata[K, V], R]
-    ): JavaRDD[R] = jsc.sc.withScope {
-    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
-    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
-    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
-    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
-    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
-    val leaderMap = Map(leaders.asScala.toSeq: _*)
-    createRDD[K, V, KD, VD, R](
-      jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, leaderMap, 
messageHandler.call(_))
-  }
-
-  /**
-   * Create an input stream that directly pulls messages from Kafka Brokers
-   * without using any receiver. This stream can guarantee that each message
-   * from Kafka is included in transformations exactly once (see points below).
-   *
-   * Points to note:
-   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
-   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
-   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
-   *    You can access the offsets used in each batch from the generated RDDs 
(see
-   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
-   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
-   *    in the [[StreamingContext]]. The information on consumed offset can be
-   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
-   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
-   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
-   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
-   *    that the output operation is idempotent, or use transactions to output 
records atomically.
-   *    See the programming guide for more details.
-   *
-   * @param ssc StreamingContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
-   *    host1:port1,host2:port2 form.
-   * @param fromOffsets Per-topic/partition Kafka offsets defining the 
(inclusive)
-   *    starting point of the stream
-   * @param messageHandler Function for translating each message and metadata 
into the desired type
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @tparam R type returned by messageHandler
-   * @return DStream of R
-   */
-  def createDirectStream[
-    K: ClassTag,
-    V: ClassTag,
-    KD <: Decoder[K]: ClassTag,
-    VD <: Decoder[V]: ClassTag,
-    R: ClassTag] (
-      ssc: StreamingContext,
-      kafkaParams: Map[String, String],
-      fromOffsets: Map[TopicAndPartition, Long],
-      messageHandler: MessageAndMetadata[K, V] => R
-  ): InputDStream[R] = {
-    val cleanedHandler = ssc.sc.clean(messageHandler)
-    new DirectKafkaInputDStream[K, V, KD, VD, R](
-      ssc, kafkaParams, fromOffsets, cleanedHandler)
-  }
-
-  /**
-   * Create an input stream that directly pulls messages from Kafka Brokers
-   * without using any receiver. This stream can guarantee that each message
-   * from Kafka is included in transformations exactly once (see points below).
-   *
-   * Points to note:
-   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
-   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
-   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
-   *    You can access the offsets used in each batch from the generated RDDs 
(see
-   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
-   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
-   *    in the [[StreamingContext]]. The information on consumed offset can be
-   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
-   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
-   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
-   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
-   *    that the output operation is idempotent, or use transactions to output 
records atomically.
-   *    See the programming guide for more details.
-   *
-   * @param ssc StreamingContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *   configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
-   *   host1:port1,host2:port2 form.
-   *   If not starting from a checkpoint, "auto.offset.reset" may be set to 
"largest" or "smallest"
-   *   to determine where the stream starts (defaults to "largest")
-   * @param topics Names of the topics to consume
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createDirectStream[
-    K: ClassTag,
-    V: ClassTag,
-    KD <: Decoder[K]: ClassTag,
-    VD <: Decoder[V]: ClassTag] (
-      ssc: StreamingContext,
-      kafkaParams: Map[String, String],
-      topics: Set[String]
-  ): InputDStream[(K, V)] = {
-    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, 
mmd.message)
-    val kc = new KafkaCluster(kafkaParams)
-    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
-    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
-      ssc, kafkaParams, fromOffsets, messageHandler)
-  }
-
-  /**
-   * Create an input stream that directly pulls messages from Kafka Brokers
-   * without using any receiver. This stream can guarantee that each message
-   * from Kafka is included in transformations exactly once (see points below).
-   *
-   * Points to note:
-   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
-   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
-   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
-   *    You can access the offsets used in each batch from the generated RDDs 
(see
-   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
-   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
-   *    in the [[StreamingContext]]. The information on consumed offset can be
-   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
-   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
-   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
-   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
-   *    that the output operation is idempotent, or use transactions to output 
records atomically.
-   *    See the programming guide for more details.
-   *
-   * @param jssc JavaStreamingContext object
-   * @param keyClass Class of the keys in the Kafka records
-   * @param valueClass Class of the values in the Kafka records
-   * @param keyDecoderClass Class of the key decoder
-   * @param valueDecoderClass Class of the value decoder
-   * @param recordClass Class of the records in DStream
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *   configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
-   *   host1:port1,host2:port2 form.
-   * @param fromOffsets Per-topic/partition Kafka offsets defining the 
(inclusive)
-   *    starting point of the stream
-   * @param messageHandler Function for translating each message and metadata 
into the desired type
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @tparam R type returned by messageHandler
-   * @return DStream of R
-   */
-  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
-      jssc: JavaStreamingContext,
-      keyClass: Class[K],
-      valueClass: Class[V],
-      keyDecoderClass: Class[KD],
-      valueDecoderClass: Class[VD],
-      recordClass: Class[R],
-      kafkaParams: JMap[String, String],
-      fromOffsets: JMap[TopicAndPartition, JLong],
-      messageHandler: JFunction[MessageAndMetadata[K, V], R]
-    ): JavaInputDStream[R] = {
-    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
-    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
-    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
-    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
-    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
-    val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
-    createDirectStream[K, V, KD, VD, R](
-      jssc.ssc,
-      Map(kafkaParams.asScala.toSeq: _*),
-      Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*),
-      cleanedHandler
-    )
-  }
-
-  /**
-   * Create an input stream that directly pulls messages from Kafka Brokers
-   * without using any receiver. This stream can guarantee that each message
-   * from Kafka is included in transformations exactly once (see points below).
-   *
-   * Points to note:
-   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
-   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
-   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
-   *    You can access the offsets used in each batch from the generated RDDs 
(see
-   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
-   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
-   *    in the [[StreamingContext]]. The information on consumed offset can be
-   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
-   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
-   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
-   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
-   *    that the output operation is idempotent, or use transactions to output 
records atomically.
-   *    See the programming guide for more details.
-   *
-   * @param jssc JavaStreamingContext object
-   * @param keyClass Class of the keys in the Kafka records
-   * @param valueClass Class of the values in the Kafka records
-   * @param keyDecoderClass Class of the key decoder
-   * @param valueDecoderClass Class type of the value decoder
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *   configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
-   *   host1:port1,host2:port2 form.
-   *   If not starting from a checkpoint, "auto.offset.reset" may be set to 
"largest" or "smallest"
-   *   to determine where the stream starts (defaults to "largest")
-   * @param topics Names of the topics to consume
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
-      jssc: JavaStreamingContext,
-      keyClass: Class[K],
-      valueClass: Class[V],
-      keyDecoderClass: Class[KD],
-      valueDecoderClass: Class[VD],
-      kafkaParams: JMap[String, String],
-      topics: JSet[String]
-    ): JavaPairInputDStream[K, V] = {
-    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
-    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
-    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
-    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
-    createDirectStream[K, V, KD, VD](
-      jssc.ssc,
-      Map(kafkaParams.asScala.toSeq: _*),
-      Set(topics.asScala.toSeq: _*)
-    )
-  }
-}
-
-/**
- * This is a helper class that wraps the KafkaUtils.createStream() into more
- * Python-friendly class and function so that it can be easily
- * instantiated and called from Python's KafkaUtils.
- *
- * The zero-arg constructor helps instantiate this class from the Class object
- * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
- * takes care of known parameters instead of passing them from Python
- */
-private[kafka] class KafkaUtilsPythonHelper {
-  import KafkaUtilsPythonHelper._
-
-  def createStream(
-      jssc: JavaStreamingContext,
-      kafkaParams: JMap[String, String],
-      topics: JMap[String, JInt],
-      storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], 
Array[Byte]] = {
-    KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, 
DefaultDecoder](
-      jssc,
-      classOf[Array[Byte]],
-      classOf[Array[Byte]],
-      classOf[DefaultDecoder],
-      classOf[DefaultDecoder],
-      kafkaParams,
-      topics,
-      storageLevel)
-  }
-
-  def createRDDWithoutMessageHandler(
-      jsc: JavaSparkContext,
-      kafkaParams: JMap[String, String],
-      offsetRanges: JList[OffsetRange],
-      leaders: JMap[TopicAndPartition, Broker]): JavaRDD[(Array[Byte], 
Array[Byte])] = {
-    val messageHandler =
-      (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, 
mmd.message)
-    new JavaRDD(createRDD(jsc, kafkaParams, offsetRanges, leaders, 
messageHandler))
-  }
-
-  def createRDDWithMessageHandler(
-      jsc: JavaSparkContext,
-      kafkaParams: JMap[String, String],
-      offsetRanges: JList[OffsetRange],
-      leaders: JMap[TopicAndPartition, Broker]): JavaRDD[Array[Byte]] = {
-    val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
-      new PythonMessageAndMetadata(
-        mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
-    val rdd = createRDD(jsc, kafkaParams, offsetRanges, leaders, 
messageHandler).
-        mapPartitions(picklerIterator)
-    new JavaRDD(rdd)
-  }
-
-  private def createRDD[V: ClassTag](
-      jsc: JavaSparkContext,
-      kafkaParams: JMap[String, String],
-      offsetRanges: JList[OffsetRange],
-      leaders: JMap[TopicAndPartition, Broker],
-      messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): 
RDD[V] = {
-    KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, 
DefaultDecoder, V](
-      jsc.sc,
-      kafkaParams.asScala.toMap,
-      offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())),
-      leaders.asScala.toMap,
-      messageHandler
-    )
-  }
-
-  def createDirectStreamWithoutMessageHandler(
-      jssc: JavaStreamingContext,
-      kafkaParams: JMap[String, String],
-      topics: JSet[String],
-      fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], 
Array[Byte])] = {
-    val messageHandler =
-      (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, 
mmd.message)
-    new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, 
messageHandler))
-  }
-
-  def createDirectStreamWithMessageHandler(
-      jssc: JavaStreamingContext,
-      kafkaParams: JMap[String, String],
-      topics: JSet[String],
-      fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = 
{
-    val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
-      new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, 
mmd.key(), mmd.message())
-    val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, 
messageHandler).
-      mapPartitions(picklerIterator)
-    new JavaDStream(stream)
-  }
-
-  private def createDirectStream[V: ClassTag](
-      jssc: JavaStreamingContext,
-      kafkaParams: JMap[String, String],
-      topics: JSet[String],
-      fromOffsets: JMap[TopicAndPartition, JLong],
-      messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): 
DStream[V] = {
-
-    val currentFromOffsets = if (!fromOffsets.isEmpty) {
-      val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic)
-      if (topicsFromOffsets != topics.asScala.toSet) {
-        throw new IllegalStateException(
-          s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " +
-          s"do not equal to the topic from offsets: 
${topicsFromOffsets.mkString(" ")}")
-      }
-      Map(fromOffsets.asScala.mapValues { _.longValue() }.toSeq: _*)
-    } else {
-      val kc = new KafkaCluster(Map(kafkaParams.asScala.toSeq: _*))
-      KafkaUtils.getFromOffsets(
-        kc, Map(kafkaParams.asScala.toSeq: _*), Set(topics.asScala.toSeq: _*))
-    }
-
-    KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, 
DefaultDecoder, V](
-      jssc.ssc,
-      Map(kafkaParams.asScala.toSeq: _*),
-      Map(currentFromOffsets.toSeq: _*),
-      messageHandler)
-  }
-
-  def createOffsetRange(topic: String, partition: JInt, fromOffset: JLong, 
untilOffset: JLong
-    ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, 
untilOffset)
-
-  def createTopicAndPartition(topic: String, partition: JInt): 
TopicAndPartition =
-    TopicAndPartition(topic, partition)
-
-  def createBroker(host: String, port: JInt): Broker = Broker(host, port)
-
-  def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = {
-    val parentRDDs = rdd.getNarrowAncestors
-    val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, _, 
_, _, _]])
-
-    require(
-      kafkaRDDs.length == 1,
-      "Cannot get offset ranges, as there may be multiple Kafka RDDs or no 
Kafka RDD associated" +
-        "with this RDD, please call this method only on a Kafka RDD.")
-
-    val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]]
-    kafkaRDD.offsetRanges.toSeq.asJava
-  }
-}
-
-private object KafkaUtilsPythonHelper {
-  private var initialized = false
-
-  def initialize(): Unit = {
-    SerDeUtil.initialize()
-    synchronized {
-      if (!initialized) {
-        new PythonMessageAndMetadataPickler().register()
-        initialized = true
-      }
-    }
-  }
-
-  initialize()
-
-  def picklerIterator(iter: Iterator[Any]): Iterator[Array[Byte]] = {
-    new SerDeUtil.AutoBatchedPickler(iter)
-  }
-
-  case class PythonMessageAndMetadata(
-      topic: String,
-      partition: JInt,
-      offset: JLong,
-      key: Array[Byte],
-      message: Array[Byte])
-
-  class PythonMessageAndMetadataPickler extends IObjectPickler {
-    private val module = "pyspark.streaming.kafka"
-
-    def register(): Unit = {
-      Pickler.registerCustomPickler(classOf[PythonMessageAndMetadata], this)
-      Pickler.registerCustomPickler(this.getClass, this)
-    }
-
-    def pickle(obj: Object, out: OutputStream, pickler: Pickler) {
-      if (obj == this) {
-        out.write(Opcodes.GLOBAL)
-        
out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8))
-      } else {
-        pickler.save(this)
-        val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata]
-        out.write(Opcodes.MARK)
-        pickler.save(msgAndMetaData.topic)
-        pickler.save(msgAndMetaData.partition)
-        pickler.save(msgAndMetaData.offset)
-        pickler.save(msgAndMetaData.key)
-        pickler.save(msgAndMetaData.message)
-        out.write(Opcodes.TUPLE)
-        out.write(Opcodes.REDUCE)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
deleted file mode 100644
index d9b856e..0000000
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import kafka.common.TopicAndPartition
-
-/**
- * Represents any object that has a collection of [[OffsetRange]]s. This can 
be used to access the
- * offset ranges in RDDs generated by the direct Kafka DStream (see
- * [[KafkaUtils.createDirectStream()]]).
- * {{{
- *   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
- *      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- *      ...
- *   }
- * }}}
- */
-trait HasOffsetRanges {
-  def offsetRanges: Array[OffsetRange]
-}
-
-/**
- * Represents a range of offsets from a single Kafka TopicAndPartition. 
Instances of this class
- * can be created with `OffsetRange.create()`.
- * @param topic Kafka topic name
- * @param partition Kafka partition id
- * @param fromOffset Inclusive starting offset
- * @param untilOffset Exclusive ending offset
- */
-final class OffsetRange private(
-    val topic: String,
-    val partition: Int,
-    val fromOffset: Long,
-    val untilOffset: Long) extends Serializable {
-  import OffsetRange.OffsetRangeTuple
-
-  /** Kafka TopicAndPartition object, for convenience */
-  def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, 
partition)
-
-  /** Number of messages this OffsetRange refers to */
-  def count(): Long = untilOffset - fromOffset
-
-  override def equals(obj: Any): Boolean = obj match {
-    case that: OffsetRange =>
-      this.topic == that.topic &&
-        this.partition == that.partition &&
-        this.fromOffset == that.fromOffset &&
-        this.untilOffset == that.untilOffset
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    toTuple.hashCode()
-  }
-
-  override def toString(): String = {
-    s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset 
-> $untilOffset])"
-  }
-
-  /** this is to avoid ClassNotFoundException during checkpoint restore */
-  private[streaming]
-  def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
-}
-
-/**
- * Companion object the provides methods to create instances of 
[[OffsetRange]].
- */
-object OffsetRange {
-  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
-    new OffsetRange(topic, partition, fromOffset, untilOffset)
-
-  def create(
-      topicAndPartition: TopicAndPartition,
-      fromOffset: Long,
-      untilOffset: Long): OffsetRange =
-    new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, 
fromOffset, untilOffset)
-
-  def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
-    new OffsetRange(topic, partition, fromOffset, untilOffset)
-
-  def apply(
-      topicAndPartition: TopicAndPartition,
-      fromOffset: Long,
-      untilOffset: Long): OffsetRange =
-    new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, 
fromOffset, untilOffset)
-
-  /** this is to avoid ClassNotFoundException during checkpoint restore */
-  private[kafka]
-  type OffsetRangeTuple = (String, Int, Long, Long)
-
-  private[kafka]
-  def apply(t: OffsetRangeTuple) =
-    new OffsetRange(t._1, t._2, t._3, t._4)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
deleted file mode 100644
index 39abe3c..0000000
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.util.Properties
-import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
-
-import scala.collection.{mutable, Map}
-import scala.reflect.{classTag, ClassTag}
-
-import kafka.common.TopicAndPartition
-import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, 
KafkaStream}
-import kafka.message.MessageAndMetadata
-import kafka.serializer.Decoder
-import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, 
ZKStringSerializer, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
-
-import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
-import org.apache.spark.storage.{StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.receiver.{BlockGenerator, 
BlockGeneratorListener, Receiver}
-import org.apache.spark.util.ThreadUtils
-
-/**
- * ReliableKafkaReceiver offers the ability to reliably store data into 
BlockManager without loss.
- * It is turned off by default and will be enabled when
- * spark.streaming.receiver.writeAheadLog.enable is true. The difference 
compared to KafkaReceiver
- * is that this receiver manages topic-partition/offset itself and updates the 
offset information
- * after data is reliably stored as write-ahead log. Offsets will only be 
updated when data is
- * reliably stored, so the potential data loss problem of KafkaReceiver can be 
eliminated.
- *
- * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn 
off automatic offset
- * commit mechanism in Kafka consumer. So setting this configuration manually 
within kafkaParams
- * will not take effect.
- */
-private[streaming]
-class ReliableKafkaReceiver[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[_]: ClassTag,
-  T <: Decoder[_]: ClassTag](
-    kafkaParams: Map[String, String],
-    topics: Map[String, Int],
-    storageLevel: StorageLevel)
-    extends Receiver[(K, V)](storageLevel) with Logging {
-
-  private val groupId = kafkaParams("group.id")
-  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
-  private def conf = SparkEnv.get.conf
-
-  /** High level consumer to connect to Kafka. */
-  private var consumerConnector: ConsumerConnector = null
-
-  /** zkClient to connect to Zookeeper to commit the offsets. */
-  private var zkClient: ZkClient = null
-
-  /**
-   * A HashMap to manage the offset for each topic/partition, this HashMap is 
called in
-   * synchronized block, so mutable HashMap will not meet concurrency issue.
-   */
-  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, 
Long] = null
-
-  /** A concurrent HashMap to store the stream block id and related offset 
snapshot. */
-  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, 
Map[TopicAndPartition, Long]] = null
-
-  /**
-   * Manage the BlockGenerator in receiver itself for better managing block 
store and offset
-   * commit.
-   */
-  private var blockGenerator: BlockGenerator = null
-
-  /** Thread pool running the handlers for receiving message from multiple 
topics and partitions. */
-  private var messageHandlerThreadPool: ThreadPoolExecutor = null
-
-  override def onStart(): Unit = {
-    logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
-
-    // Initialize the topic-partition / offset hash map.
-    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
-
-    // Initialize the stream block id / offset snapshot hash map.
-    blockOffsetMap = new ConcurrentHashMap[StreamBlockId, 
Map[TopicAndPartition, Long]]()
-
-    // Initialize the block generator for storing Kafka message.
-    blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
-
-    if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && 
kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
-      logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in 
ReliableKafkaReceiver, " +
-        "otherwise we will manually set it to false to turn off auto offset 
commit in Kafka")
-    }
-
-    val props = new Properties()
-    kafkaParams.foreach(param => props.put(param._1, param._2))
-    // Manually set "auto.commit.enable" to "false" no matter user explicitly 
set it to true,
-    // we have to make sure this property is set to false to turn off auto 
commit mechanism in
-    // Kafka.
-    props.setProperty(AUTO_OFFSET_COMMIT, "false")
-
-    val consumerConfig = new ConsumerConfig(props)
-
-    assert(!consumerConfig.autoCommitEnable)
-
-    logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
-    consumerConnector = Consumer.create(consumerConfig)
-    logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
-
-    zkClient = new ZkClient(consumerConfig.zkConnect, 
consumerConfig.zkSessionTimeoutMs,
-      consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
-
-    messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
-      topics.values.sum, "KafkaMessageHandler")
-
-    blockGenerator.start()
-
-    val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(consumerConfig.props)
-      .asInstanceOf[Decoder[K]]
-
-    val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(consumerConfig.props)
-      .asInstanceOf[Decoder[V]]
-
-    val topicMessageStreams = consumerConnector.createMessageStreams(
-      topics, keyDecoder, valueDecoder)
-
-    topicMessageStreams.values.foreach { streams =>
-      streams.foreach { stream =>
-        messageHandlerThreadPool.submit(new MessageHandler(stream))
-      }
-    }
-  }
-
-  override def onStop(): Unit = {
-    if (messageHandlerThreadPool != null) {
-      messageHandlerThreadPool.shutdown()
-      messageHandlerThreadPool = null
-    }
-
-    if (consumerConnector != null) {
-      consumerConnector.shutdown()
-      consumerConnector = null
-    }
-
-    if (zkClient != null) {
-      zkClient.close()
-      zkClient = null
-    }
-
-    if (blockGenerator != null) {
-      blockGenerator.stop()
-      blockGenerator = null
-    }
-
-    if (topicPartitionOffsetMap != null) {
-      topicPartitionOffsetMap.clear()
-      topicPartitionOffsetMap = null
-    }
-
-    if (blockOffsetMap != null) {
-      blockOffsetMap.clear()
-      blockOffsetMap = null
-    }
-  }
-
-  /** Store a Kafka message and the associated metadata as a tuple. */
-  private def storeMessageAndMetadata(
-      msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
-    val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, 
msgAndMetadata.partition)
-    val data = (msgAndMetadata.key, msgAndMetadata.message)
-    val metadata = (topicAndPartition, msgAndMetadata.offset)
-    blockGenerator.addDataWithCallback(data, metadata)
-  }
-
-  /** Update stored offset */
-  private def updateOffset(topicAndPartition: TopicAndPartition, offset: 
Long): Unit = {
-    topicPartitionOffsetMap.put(topicAndPartition, offset)
-  }
-
-  /**
-   * Remember the current offsets for each topic and partition. This is called 
when a block is
-   * generated.
-   */
-  private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
-    // Get a snapshot of current offset map and store with related block id.
-    val offsetSnapshot = topicPartitionOffsetMap.toMap
-    blockOffsetMap.put(blockId, offsetSnapshot)
-    topicPartitionOffsetMap.clear()
-  }
-
-  /**
-   * Store the ready-to-be-stored block and commit the related offsets to 
zookeeper. This method
-   * will try a fixed number of times to push the block. If the push fails, 
the receiver is stopped.
-   */
-  private def storeBlockAndCommitOffset(
-      blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
-    var count = 0
-    var pushed = false
-    var exception: Exception = null
-    while (!pushed && count <= 3) {
-      try {
-        store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
-        pushed = true
-      } catch {
-        case ex: Exception =>
-          count += 1
-          exception = ex
-      }
-    }
-    if (pushed) {
-      Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
-      blockOffsetMap.remove(blockId)
-    } else {
-      stop("Error while storing block into Spark", exception)
-    }
-  }
-
-  /**
-   * Commit the offset of Kafka's topic/partition, the commit mechanism follow 
Kafka 0.8.x's
-   * metadata schema in Zookeeper.
-   */
-  private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
-    if (zkClient == null) {
-      val thrown = new IllegalStateException("Zookeeper client is unexpectedly 
null")
-      stop("Zookeeper client is not initialized before commit offsets to ZK", 
thrown)
-      return
-    }
-
-    for ((topicAndPart, offset) <- offsetMap) {
-      try {
-        val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
-        val zkPath = 
s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
-
-        ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
-      } catch {
-        case e: Exception =>
-          logWarning(s"Exception during commit offset $offset for topic" +
-            s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
-      }
-
-      logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
-        s"partition ${topicAndPart.partition}")
-    }
-  }
-
-  /** Class to handle received Kafka message. */
-  private final class MessageHandler(stream: KafkaStream[K, V]) extends 
Runnable {
-    override def run(): Unit = {
-      while (!isStopped) {
-        try {
-          val streamIterator = stream.iterator()
-          while (streamIterator.hasNext) {
-            storeMessageAndMetadata(streamIterator.next)
-          }
-        } catch {
-          case e: Exception =>
-            reportError("Error handling message", e)
-        }
-      }
-    }
-  }
-
-  /** Class to handle blocks generated by the block generator. */
-  private final class GeneratedBlockHandler extends BlockGeneratorListener {
-
-    def onAddData(data: Any, metadata: Any): Unit = {
-      // Update the offset of the data that was added to the generator
-      if (metadata != null) {
-        val (topicAndPartition, offset) = 
metadata.asInstanceOf[(TopicAndPartition, Long)]
-        updateOffset(topicAndPartition, offset)
-      }
-    }
-
-    def onGenerateBlock(blockId: StreamBlockId): Unit = {
-      // Remember the offsets of topics/partitions when a block has been 
generated
-      rememberBlockOffsets(blockId)
-    }
-
-    def onPushBlock(blockId: StreamBlockId, arrayBuffer: 
mutable.ArrayBuffer[_]): Unit = {
-      // Store block and commit the blocks offset
-      storeBlockAndCommitOffset(blockId, arrayBuffer)
-    }
-
-    def onError(message: String, throwable: Throwable): Unit = {
-      reportError(message, throwable)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
deleted file mode 100644
index 2e5ab0f..0000000
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Kafka receiver for spark streaming.
- */
-package org.apache.spark.streaming.kafka;

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala
deleted file mode 100644
index 47c5187..0000000
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Kafka receiver for spark streaming,
- */
-package object kafka

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
deleted file mode 100644
index fa6b0db..0000000
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka;
-
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.Tuple2;
-
-import kafka.common.TopicAndPartition;
-import kafka.message.MessageAndMetadata;
-import kafka.serializer.StringDecoder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-public class JavaDirectKafkaStreamSuite implements Serializable {
-  private transient JavaStreamingContext ssc = null;
-  private transient KafkaTestUtils kafkaTestUtils = null;
-
-  @Before
-  public void setUp() {
-    kafkaTestUtils = new KafkaTestUtils();
-    kafkaTestUtils.setup();
-    SparkConf sparkConf = new SparkConf()
-      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
-    ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
-  }
-
-  @After
-  public void tearDown() {
-    if (ssc != null) {
-      ssc.stop();
-      ssc = null;
-    }
-
-    if (kafkaTestUtils != null) {
-      kafkaTestUtils.teardown();
-      kafkaTestUtils = null;
-    }
-  }
-
-  @Test
-  public void testKafkaStream() throws InterruptedException {
-    final String topic1 = "topic1";
-    final String topic2 = "topic2";
-    // hold a reference to the current offset ranges, so it can be used 
downstream
-    final AtomicReference<OffsetRange[]> offsetRanges = new 
AtomicReference<>();
-
-    String[] topic1data = createTopicAndSendData(topic1);
-    String[] topic2data = createTopicAndSendData(topic2);
-
-    Set<String> sent = new HashSet<>();
-    sent.addAll(Arrays.asList(topic1data));
-    sent.addAll(Arrays.asList(topic2data));
-
-    Map<String, String> kafkaParams = new HashMap<>();
-    kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
-    kafkaParams.put("auto.offset.reset", "smallest");
-
-    JavaDStream<String> stream1 = KafkaUtils.createDirectStream(
-        ssc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        kafkaParams,
-        topicToSet(topic1)
-    ).transformToPair(
-        // Make sure you can get offset ranges from the rdd
-        new Function<JavaPairRDD<String, String>, JavaPairRDD<String, 
String>>() {
-          @Override
-          public JavaPairRDD<String, String> call(JavaPairRDD<String, String> 
rdd) {
-            OffsetRange[] offsets = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
-            offsetRanges.set(offsets);
-            Assert.assertEquals(topic1, offsets[0].topic());
-            return rdd;
-          }
-        }
-    ).map(
-        new Function<Tuple2<String, String>, String>() {
-          @Override
-          public String call(Tuple2<String, String> kv) {
-            return kv._2();
-          }
-        }
-    );
-
-    JavaDStream<String> stream2 = KafkaUtils.createDirectStream(
-        ssc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        String.class,
-        kafkaParams,
-        topicOffsetToMap(topic2, 0L),
-        new Function<MessageAndMetadata<String, String>, String>() {
-          @Override
-          public String call(MessageAndMetadata<String, String> msgAndMd) {
-            return msgAndMd.message();
-          }
-        }
-    );
-    JavaDStream<String> unifiedStream = stream1.union(stream2);
-
-    final Set<String> result = Collections.synchronizedSet(new 
HashSet<String>());
-    unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
-          @Override
-          public void call(JavaRDD<String> rdd) {
-            result.addAll(rdd.collect());
-            for (OffsetRange o : offsetRanges.get()) {
-              System.out.println(
-                o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + 
o.untilOffset()
-              );
-            }
-          }
-        }
-    );
-    ssc.start();
-    long startTime = System.currentTimeMillis();
-    boolean matches = false;
-    while (!matches && System.currentTimeMillis() - startTime < 20000) {
-      matches = sent.size() == result.size();
-      Thread.sleep(50);
-    }
-    Assert.assertEquals(sent, result);
-    ssc.stop();
-  }
-
-  private static Set<String> topicToSet(String topic) {
-    Set<String> topicSet = new HashSet<>();
-    topicSet.add(topic);
-    return topicSet;
-  }
-
-  private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, 
Long offsetToStart) {
-    Map<TopicAndPartition, Long> topicMap = new HashMap<>();
-    topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
-    return topicMap;
-  }
-
-  private  String[] createTopicAndSendData(String topic) {
-    String[] data = { topic + "-1", topic + "-2", topic + "-3"};
-    kafkaTestUtils.createTopic(topic, 1);
-    kafkaTestUtils.sendMessages(topic, data);
-    return data;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to