http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala deleted file mode 100644 index d4881b1..0000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import scala.collection.mutable.ArrayBuffer -import scala.reflect.{classTag, ClassTag} - -import kafka.api.{FetchRequestBuilder, FetchResponse} -import kafka.common.{ErrorMapping, TopicAndPartition} -import kafka.consumer.SimpleConsumer -import kafka.message.{MessageAndMetadata, MessageAndOffset} -import kafka.serializer.Decoder -import kafka.utils.VerifiableProperties - -import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} -import org.apache.spark.internal.Logging -import org.apache.spark.partial.{BoundedDouble, PartialResult} -import org.apache.spark.rdd.RDD -import org.apache.spark.util.NextIterator - -/** - * A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. - * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD - * @param messageHandler function for translating each message into the desired type - */ -private[kafka] -class KafkaRDD[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, - R: ClassTag] private[spark] ( - sc: SparkContext, - kafkaParams: Map[String, String], - val offsetRanges: Array[OffsetRange], - leaders: Map[TopicAndPartition, (String, Int)], - messageHandler: MessageAndMetadata[K, V] => R - ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { - override def getPartitions: Array[Partition] = { - offsetRanges.zipWithIndex.map { case (o, i) => - val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) - new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) - }.toArray - } - - override def count(): Long = offsetRanges.map(_.count).sum - - override def countApprox( - timeout: Long, - confidence: Double = 0.95 - ): PartialResult[BoundedDouble] = { - val c = count - new PartialResult(new BoundedDouble(c, 1.0, c, c), true) - } - - override def isEmpty(): Boolean = count == 0L - - override def take(num: Int): Array[R] = { - val nonEmptyPartitions = this.partitions - .map(_.asInstanceOf[KafkaRDDPartition]) - .filter(_.count > 0) - - if (num < 1 || nonEmptyPartitions.isEmpty) { - return new Array[R](0) - } - - // Determine in advance how many messages need to be taken from each partition - val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => - val remain = num - result.values.sum - if (remain > 0) { - val taken = Math.min(remain, part.count) - result + (part.index -> taken.toInt) - } else { - result - } - } - - val buf = new ArrayBuffer[R] - val res = context.runJob( - this, - (tc: TaskContext, it: Iterator[R]) => it.take(parts(tc.partitionId)).toArray, - parts.keys.toArray) - res.foreach(buf ++= _) - buf.toArray - } - - override def getPreferredLocations(thePart: Partition): Seq[String] = { - val part = thePart.asInstanceOf[KafkaRDDPartition] - // TODO is additional hostname resolution necessary here - Seq(part.host) - } - - private def errBeginAfterEnd(part: KafkaRDDPartition): String = - s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " + - s"for topic ${part.topic} partition ${part.partition}. " + - "You either provided an invalid fromOffset, or the Kafka topic has been damaged" - - private def errRanOutBeforeEnd(part: KafkaRDDPartition): String = - s"Ran out of messages before reaching ending offset ${part.untilOffset} " + - s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." + - " This should not happen, and indicates that messages may have been lost" - - private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): String = - s"Got ${itemOffset} > ending offset ${part.untilOffset} " + - s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." + - " This should not happen, and indicates a message may have been skipped" - - override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { - val part = thePart.asInstanceOf[KafkaRDDPartition] - assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) - if (part.fromOffset == part.untilOffset) { - log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " + - s"skipping ${part.topic} ${part.partition}") - Iterator.empty - } else { - new KafkaRDDIterator(part, context) - } - } - - private class KafkaRDDIterator( - part: KafkaRDDPartition, - context: TaskContext) extends NextIterator[R] { - - context.addTaskCompletionListener{ context => closeIfNeeded() } - - log.info(s"Computing topic ${part.topic}, partition ${part.partition} " + - s"offsets ${part.fromOffset} -> ${part.untilOffset}") - - val kc = new KafkaCluster(kafkaParams) - val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(kc.config.props) - .asInstanceOf[Decoder[K]] - val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(kc.config.props) - .asInstanceOf[Decoder[V]] - val consumer = connectLeader - var requestOffset = part.fromOffset - var iter: Iterator[MessageAndOffset] = null - - // The idea is to use the provided preferred host, except on task retry attempts, - // to minimize number of kafka metadata requests - private def connectLeader: SimpleConsumer = { - if (context.attemptNumber > 0) { - kc.connectLeader(part.topic, part.partition).fold( - errs => throw new SparkException( - s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " + - errs.mkString("\n")), - consumer => consumer - ) - } else { - kc.connect(part.host, part.port) - } - } - - private def handleFetchErr(resp: FetchResponse) { - if (resp.hasError) { - val err = resp.errorCode(part.topic, part.partition) - if (err == ErrorMapping.LeaderNotAvailableCode || - err == ErrorMapping.NotLeaderForPartitionCode) { - log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " + - s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms") - Thread.sleep(kc.config.refreshLeaderBackoffMs) - } - // Let normal rdd retry sort out reconnect attempts - throw ErrorMapping.exceptionFor(err) - } - } - - private def fetchBatch: Iterator[MessageAndOffset] = { - val req = new FetchRequestBuilder() - .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes) - .build() - val resp = consumer.fetch(req) - handleFetchErr(resp) - // kafka may return a batch that starts before the requested offset - resp.messageSet(part.topic, part.partition) - .iterator - .dropWhile(_.offset < requestOffset) - } - - override def close(): Unit = { - if (consumer != null) { - consumer.close() - } - } - - override def getNext(): R = { - if (iter == null || !iter.hasNext) { - iter = fetchBatch - } - if (!iter.hasNext) { - assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) - finished = true - null.asInstanceOf[R] - } else { - val item = iter.next() - if (item.offset >= part.untilOffset) { - assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part)) - finished = true - null.asInstanceOf[R] - } else { - requestOffset = item.nextOffset - messageHandler(new MessageAndMetadata( - part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder)) - } - } - } - } -} - -private[kafka] -object KafkaRDD { - import KafkaCluster.LeaderOffset - - /** - * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) - * starting point of the batch - * @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive) - * ending point of the batch - * @param messageHandler function for translating each message into the desired type - */ - def apply[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, - R: ClassTag]( - sc: SparkContext, - kafkaParams: Map[String, String], - fromOffsets: Map[TopicAndPartition, Long], - untilOffsets: Map[TopicAndPartition, LeaderOffset], - messageHandler: MessageAndMetadata[K, V] => R - ): KafkaRDD[K, V, U, T, R] = { - val leaders = untilOffsets.map { case (tp, lo) => - tp -> (lo.host, lo.port) - }.toMap - - val offsetRanges = fromOffsets.map { case (tp, fo) => - val uo = untilOffsets(tp) - OffsetRange(tp.topic, tp.partition, fo, uo.offset) - }.toArray - - new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler) - } -}
http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala deleted file mode 100644 index 02917be..0000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import org.apache.spark.Partition - -/** - * @param topic kafka topic name - * @param partition kafka partition id - * @param fromOffset inclusive starting offset - * @param untilOffset exclusive ending offset - * @param host preferred kafka host, i.e. the leader at the time the rdd was created - * @param port preferred kafka host's port - */ -private[kafka] -class KafkaRDDPartition( - val index: Int, - val topic: String, - val partition: Int, - val fromOffset: Long, - val untilOffset: Long, - val host: String, - val port: Int -) extends Partition { - /** Number of messages this partition refers to */ - def count(): Long = untilOffset - fromOffset -} http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala deleted file mode 100644 index d9d4240..0000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import java.io.File -import java.lang.{Integer => JInt} -import java.net.InetSocketAddress -import java.util.{Map => JMap, Properties} -import java.util.concurrent.TimeoutException - -import scala.annotation.tailrec -import scala.collection.JavaConverters._ -import scala.language.postfixOps -import scala.util.control.NonFatal - -import kafka.admin.AdminUtils -import kafka.api.Request -import kafka.producer.{KeyedMessage, Producer, ProducerConfig} -import kafka.serializer.StringEncoder -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{ZKStringSerializer, ZkUtils} -import org.I0Itec.zkclient.ZkClient -import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} - -import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging -import org.apache.spark.streaming.Time -import org.apache.spark.util.Utils - -/** - * This is a helper class for Kafka test suites. This has the functionality to set up - * and tear down local Kafka servers, and to push data using Kafka producers. - * - * The reason to put Kafka test utility class in src is to test Python related Kafka APIs. - */ -private[kafka] class KafkaTestUtils extends Logging { - - // Zookeeper related configurations - private val zkHost = "localhost" - private var zkPort: Int = 0 - private val zkConnectionTimeout = 60000 - private val zkSessionTimeout = 6000 - - private var zookeeper: EmbeddedZookeeper = _ - - private var zkClient: ZkClient = _ - - // Kafka broker related configurations - private val brokerHost = "localhost" - private var brokerPort = 9092 - private var brokerConf: KafkaConfig = _ - - // Kafka broker server - private var server: KafkaServer = _ - - // Kafka producer - private var producer: Producer[String, String] = _ - - // Flag to test whether the system is correctly started - private var zkReady = false - private var brokerReady = false - - def zkAddress: String = { - assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") - s"$zkHost:$zkPort" - } - - def brokerAddress: String = { - assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address") - s"$brokerHost:$brokerPort" - } - - def zookeeperClient: ZkClient = { - assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client") - Option(zkClient).getOrElse( - throw new IllegalStateException("Zookeeper client is not yet initialized")) - } - - // Set up the Embedded Zookeeper server and get the proper Zookeeper port - private def setupEmbeddedZookeeper(): Unit = { - // Zookeeper server startup - zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") - // Get the actual zookeeper binding port - zkPort = zookeeper.actualPort - zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, - ZKStringSerializer) - zkReady = true - } - - // Set up the Embedded Kafka server - private def setupEmbeddedKafkaServer(): Unit = { - assert(zkReady, "Zookeeper should be set up beforehand") - - // Kafka broker startup - Utils.startServiceOnPort(brokerPort, port => { - brokerPort = port - brokerConf = new KafkaConfig(brokerConfiguration) - server = new KafkaServer(brokerConf) - server.startup() - (server, port) - }, new SparkConf(), "KafkaBroker") - - brokerReady = true - } - - /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ - def setup(): Unit = { - setupEmbeddedZookeeper() - setupEmbeddedKafkaServer() - } - - /** Teardown the whole servers, including Kafka broker and Zookeeper */ - def teardown(): Unit = { - brokerReady = false - zkReady = false - - if (producer != null) { - producer.close() - producer = null - } - - if (server != null) { - server.shutdown() - server = null - } - - brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } - - if (zkClient != null) { - zkClient.close() - zkClient = null - } - - if (zookeeper != null) { - zookeeper.shutdown() - zookeeper = null - } - } - - /** Create a Kafka topic and wait until it is propagated to the whole cluster */ - def createTopic(topic: String, partitions: Int): Unit = { - AdminUtils.createTopic(zkClient, topic, partitions, 1) - // wait until metadata is propagated - (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) } - } - - /** Single-argument version for backwards compatibility */ - def createTopic(topic: String): Unit = createTopic(topic, 1) - - /** Java-friendly function for sending messages to the Kafka broker */ - def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { - sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*)) - } - - /** Send the messages to the Kafka broker */ - def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = { - val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray - sendMessages(topic, messages) - } - - /** Send the array of messages to the Kafka broker */ - def sendMessages(topic: String, messages: Array[String]): Unit = { - producer = new Producer[String, String](new ProducerConfig(producerConfiguration)) - producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*) - producer.close() - producer = null - } - - private def brokerConfiguration: Properties = { - val props = new Properties() - props.put("broker.id", "0") - props.put("host.name", "localhost") - props.put("port", brokerPort.toString) - props.put("log.dir", Utils.createTempDir().getAbsolutePath) - props.put("zookeeper.connect", zkAddress) - props.put("log.flush.interval.messages", "1") - props.put("replica.socket.timeout.ms", "1500") - props - } - - private def producerConfiguration: Properties = { - val props = new Properties() - props.put("metadata.broker.list", brokerAddress) - props.put("serializer.class", classOf[StringEncoder].getName) - // wait for all in-sync replicas to ack sends - props.put("request.required.acks", "-1") - props - } - - // A simplified version of scalatest eventually, rewritten here to avoid adding extra test - // dependency - def eventually[T](timeout: Time, interval: Time)(func: => T): T = { - def makeAttempt(): Either[Throwable, T] = { - try { - Right(func) - } catch { - case e if NonFatal(e) => Left(e) - } - } - - val startTime = System.currentTimeMillis() - @tailrec - def tryAgain(attempt: Int): T = { - makeAttempt() match { - case Right(result) => result - case Left(e) => - val duration = System.currentTimeMillis() - startTime - if (duration < timeout.milliseconds) { - Thread.sleep(interval.milliseconds) - } else { - throw new TimeoutException(e.getMessage) - } - - tryAgain(attempt + 1) - } - } - - tryAgain(1) - } - - private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { - def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match { - case Some(partitionState) => - val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr - - ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined && - Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && - leaderAndInSyncReplicas.isr.size >= 1 - - case _ => - false - } - eventually(Time(10000), Time(100)) { - assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout") - } - } - - private class EmbeddedZookeeper(val zkConnect: String) { - val snapshotDir = Utils.createTempDir() - val logDir = Utils.createTempDir() - - val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) - val (ip, port) = { - val splits = zkConnect.split(":") - (splits(0), splits(1).toInt) - } - val factory = new NIOServerCnxnFactory() - factory.configure(new InetSocketAddress(ip, port), 16) - factory.startup(zookeeper) - - val actualPort = factory.getLocalPort - - def shutdown() { - factory.shutdown() - Utils.deleteRecursively(snapshotDir) - Utils.deleteRecursively(logDir) - } - } -} - http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala deleted file mode 100644 index edaafb9..0000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ /dev/null @@ -1,805 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import java.io.OutputStream -import java.lang.{Integer => JInt, Long => JLong} -import java.nio.charset.StandardCharsets -import java.util.{List => JList, Map => JMap, Set => JSet} - -import scala.collection.JavaConverters._ -import scala.reflect.ClassTag - -import kafka.common.TopicAndPartition -import kafka.message.MessageAndMetadata -import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder} -import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler} - -import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} -import org.apache.spark.api.java.function.{Function => JFunction} -import org.apache.spark.api.python.SerDeUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java._ -import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} -import org.apache.spark.streaming.util.WriteAheadLogUtils - -object KafkaUtils { - /** - * Create an input stream that pulls messages from Kafka Brokers. - * @param ssc StreamingContext object - * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) - * @param groupId The group id for this consumer - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread - * @param storageLevel Storage level to use for storing the received objects - * (default: StorageLevel.MEMORY_AND_DISK_SER_2) - * @return DStream of (Kafka message key, Kafka message value) - */ - def createStream( - ssc: StreamingContext, - zkQuorum: String, - groupId: String, - topics: Map[String, Int], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[(String, String)] = { - val kafkaParams = Map[String, String]( - "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, - "zookeeper.connection.timeout.ms" -> "10000") - createStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topics, storageLevel) - } - - /** - * Create an input stream that pulls messages from Kafka Brokers. - * @param ssc StreamingContext object - * @param kafkaParams Map of kafka configuration parameters, - * see http://kafka.apache.org/08/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - * @tparam U type of Kafka message key decoder - * @tparam T type of Kafka message value decoder - * @return DStream of (Kafka message key, Kafka message value) - */ - def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( - ssc: StreamingContext, - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ): ReceiverInputDStream[(K, V)] = { - val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) - new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) - } - - /** - * Create an input stream that pulls messages from Kafka Brokers. - * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. - * @param jssc JavaStreamingContext object - * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) - * @param groupId The group id for this consumer - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread - * @return DStream of (Kafka message key, Kafka message value) - */ - def createStream( - jssc: JavaStreamingContext, - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt] - ): JavaPairReceiverInputDStream[String, String] = { - createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*)) - } - - /** - * Create an input stream that pulls messages from Kafka Brokers. - * @param jssc JavaStreamingContext object - * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. - * @return DStream of (Kafka message key, Kafka message value) - */ - def createStream( - jssc: JavaStreamingContext, - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt], - storageLevel: StorageLevel - ): JavaPairReceiverInputDStream[String, String] = { - createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*), - storageLevel) - } - - /** - * Create an input stream that pulls messages from Kafka Brokers. - * @param jssc JavaStreamingContext object - * @param keyTypeClass Key type of DStream - * @param valueTypeClass value type of Dstream - * @param keyDecoderClass Type of kafka key decoder - * @param valueDecoderClass Type of kafka value decoder - * @param kafkaParams Map of kafka configuration parameters, - * see http://kafka.apache.org/08/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread - * @param storageLevel RDD storage level. - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - * @tparam U type of Kafka message key decoder - * @tparam T type of Kafka message value decoder - * @return DStream of (Kafka message key, Kafka message value) - */ - def createStream[K, V, U <: Decoder[_], T <: Decoder[_]]( - jssc: JavaStreamingContext, - keyTypeClass: Class[K], - valueTypeClass: Class[V], - keyDecoderClass: Class[U], - valueDecoderClass: Class[T], - kafkaParams: JMap[String, String], - topics: JMap[String, JInt], - storageLevel: StorageLevel - ): JavaPairReceiverInputDStream[K, V] = { - implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass) - implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass) - - implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass) - implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass) - - createStream[K, V, U, T]( - jssc.ssc, - kafkaParams.asScala.toMap, - Map(topics.asScala.mapValues(_.intValue()).toSeq: _*), - storageLevel) - } - - /** get leaders for the given offset ranges, or throw an exception */ - private def leadersForRanges( - kc: KafkaCluster, - offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = { - val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet - val leaders = kc.findLeaders(topics) - KafkaCluster.checkErrors(leaders) - } - - /** Make sure offsets are available in kafka, or throw an exception */ - private def checkOffsets( - kc: KafkaCluster, - offsetRanges: Array[OffsetRange]): Unit = { - val topics = offsetRanges.map(_.topicAndPartition).toSet - val result = for { - low <- kc.getEarliestLeaderOffsets(topics).right - high <- kc.getLatestLeaderOffsets(topics).right - } yield { - offsetRanges.filterNot { o => - low(o.topicAndPartition).offset <= o.fromOffset && - o.untilOffset <= high(o.topicAndPartition).offset - } - } - val badRanges = KafkaCluster.checkErrors(result) - if (!badRanges.isEmpty) { - throw new SparkException("Offsets not available on leader: " + badRanges.mkString(",")) - } - } - - private[kafka] def getFromOffsets( - kc: KafkaCluster, - kafkaParams: Map[String, String], - topics: Set[String] - ): Map[TopicAndPartition, Long] = { - val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) - val result = for { - topicPartitions <- kc.getPartitions(topics).right - leaderOffsets <- (if (reset == Some("smallest")) { - kc.getEarliestLeaderOffsets(topicPartitions) - } else { - kc.getLatestLeaderOffsets(topicPartitions) - }).right - } yield { - leaderOffsets.map { case (tp, lo) => - (tp, lo.offset) - } - } - KafkaCluster.checkErrors(result) - } - - /** - * Create a RDD from Kafka using offset ranges for each topic and partition. - * - * @param sc SparkContext object - * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" - * to be set with Kafka broker(s) (NOT zookeeper servers) specified in - * host1:port1,host2:port2 form. - * @param offsetRanges Each OffsetRange in the batch corresponds to a - * range of offsets for a given Kafka topic/partition - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - * @tparam KD type of Kafka message key decoder - * @tparam VD type of Kafka message value decoder - * @return RDD of (Kafka message key, Kafka message value) - */ - def createRDD[ - K: ClassTag, - V: ClassTag, - KD <: Decoder[K]: ClassTag, - VD <: Decoder[V]: ClassTag]( - sc: SparkContext, - kafkaParams: Map[String, String], - offsetRanges: Array[OffsetRange] - ): RDD[(K, V)] = sc.withScope { - val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) - val kc = new KafkaCluster(kafkaParams) - val leaders = leadersForRanges(kc, offsetRanges) - checkOffsets(kc, offsetRanges) - new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) - } - - /** - * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you - * specify the Kafka leader to connect to (to optimize fetching) and access the message as well - * as the metadata. - * - * @param sc SparkContext object - * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" - * to be set with Kafka broker(s) (NOT zookeeper servers) specified in - * host1:port1,host2:port2 form. - * @param offsetRanges Each OffsetRange in the batch corresponds to a - * range of offsets for a given Kafka topic/partition - * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, - * in which case leaders will be looked up on the driver. - * @param messageHandler Function for translating each message and metadata into the desired type - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - * @tparam KD type of Kafka message key decoder - * @tparam VD type of Kafka message value decoder - * @tparam R type returned by messageHandler - * @return RDD of R - */ - def createRDD[ - K: ClassTag, - V: ClassTag, - KD <: Decoder[K]: ClassTag, - VD <: Decoder[V]: ClassTag, - R: ClassTag]( - sc: SparkContext, - kafkaParams: Map[String, String], - offsetRanges: Array[OffsetRange], - leaders: Map[TopicAndPartition, Broker], - messageHandler: MessageAndMetadata[K, V] => R - ): RDD[R] = sc.withScope { - val kc = new KafkaCluster(kafkaParams) - val leaderMap = if (leaders.isEmpty) { - leadersForRanges(kc, offsetRanges) - } else { - // This could be avoided by refactoring KafkaRDD.leaders and KafkaCluster to use Broker - leaders.map { - case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port)) - } - } - val cleanedHandler = sc.clean(messageHandler) - checkOffsets(kc, offsetRanges) - new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler) - } - - /** - * Create a RDD from Kafka using offset ranges for each topic and partition. - * - * @param jsc JavaSparkContext object - * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" - * to be set with Kafka broker(s) (NOT zookeeper servers) specified in - * host1:port1,host2:port2 form. - * @param offsetRanges Each OffsetRange in the batch corresponds to a - * range of offsets for a given Kafka topic/partition - * @param keyClass type of Kafka message key - * @param valueClass type of Kafka message value - * @param keyDecoderClass type of Kafka message key decoder - * @param valueDecoderClass type of Kafka message value decoder - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - * @tparam KD type of Kafka message key decoder - * @tparam VD type of Kafka message value decoder - * @return RDD of (Kafka message key, Kafka message value) - */ - def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]]( - jsc: JavaSparkContext, - keyClass: Class[K], - valueClass: Class[V], - keyDecoderClass: Class[KD], - valueDecoderClass: Class[VD], - kafkaParams: JMap[String, String], - offsetRanges: Array[OffsetRange] - ): JavaPairRDD[K, V] = jsc.sc.withScope { - implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) - implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) - implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) - implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) - new JavaPairRDD(createRDD[K, V, KD, VD]( - jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges)) - } - - /** - * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you - * specify the Kafka leader to connect to (to optimize fetching) and access the message as well - * as the metadata. - * - * @param jsc JavaSparkContext object - * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" - * to be set with Kafka broker(s) (NOT zookeeper servers) specified in - * host1:port1,host2:port2 form. - * @param offsetRanges Each OffsetRange in the batch corresponds to a - * range of offsets for a given Kafka topic/partition - * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, - * in which case leaders will be looked up on the driver. - * @param messageHandler Function for translating each message and metadata into the desired type - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - * @tparam KD type of Kafka message key decoder - * @tparam VD type of Kafka message value decoder - * @tparam R type returned by messageHandler - * @return RDD of R - */ - def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( - jsc: JavaSparkContext, - keyClass: Class[K], - valueClass: Class[V], - keyDecoderClass: Class[KD], - valueDecoderClass: Class[VD], - recordClass: Class[R], - kafkaParams: JMap[String, String], - offsetRanges: Array[OffsetRange], - leaders: JMap[TopicAndPartition, Broker], - messageHandler: JFunction[MessageAndMetadata[K, V], R] - ): JavaRDD[R] = jsc.sc.withScope { - implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) - implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) - implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) - implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) - implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) - val leaderMap = Map(leaders.asScala.toSeq: _*) - createRDD[K, V, KD, VD, R]( - jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, leaderMap, messageHandler.call(_)) - } - - /** - * Create an input stream that directly pulls messages from Kafka Brokers - * without using any receiver. This stream can guarantee that each message - * from Kafka is included in transformations exactly once (see points below). - * - * Points to note: - * - No receivers: This stream does not use any receiver. It directly queries Kafka - * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked - * by the stream itself. For interoperability with Kafka monitoring tools that depend on - * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. - * You can access the offsets used in each batch from the generated RDDs (see - * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). - * - Failure Recovery: To recover from driver failures, you have to enable checkpointing - * in the [[StreamingContext]]. The information on consumed offset can be - * recovered from the checkpoint. See the programming guide for details (constraints, etc.). - * - End-to-end semantics: This stream ensures that every records is effectively received and - * transformed exactly once, but gives no guarantees on whether the transformed data are - * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure - * that the output operation is idempotent, or use transactions to output records atomically. - * See the programming guide for more details. - * - * @param ssc StreamingContext object - * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" - * to be set with Kafka broker(s) (NOT zookeeper servers) specified in - * host1:port1,host2:port2 form. - * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) - * starting point of the stream - * @param messageHandler Function for translating each message and metadata into the desired type - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - * @tparam KD type of Kafka message key decoder - * @tparam VD type of Kafka message value decoder - * @tparam R type returned by messageHandler - * @return DStream of R - */ - def createDirectStream[ - K: ClassTag, - V: ClassTag, - KD <: Decoder[K]: ClassTag, - VD <: Decoder[V]: ClassTag, - R: ClassTag] ( - ssc: StreamingContext, - kafkaParams: Map[String, String], - fromOffsets: Map[TopicAndPartition, Long], - messageHandler: MessageAndMetadata[K, V] => R - ): InputDStream[R] = { - val cleanedHandler = ssc.sc.clean(messageHandler) - new DirectKafkaInputDStream[K, V, KD, VD, R]( - ssc, kafkaParams, fromOffsets, cleanedHandler) - } - - /** - * Create an input stream that directly pulls messages from Kafka Brokers - * without using any receiver. This stream can guarantee that each message - * from Kafka is included in transformations exactly once (see points below). - * - * Points to note: - * - No receivers: This stream does not use any receiver. It directly queries Kafka - * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked - * by the stream itself. For interoperability with Kafka monitoring tools that depend on - * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. - * You can access the offsets used in each batch from the generated RDDs (see - * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). - * - Failure Recovery: To recover from driver failures, you have to enable checkpointing - * in the [[StreamingContext]]. The information on consumed offset can be - * recovered from the checkpoint. See the programming guide for details (constraints, etc.). - * - End-to-end semantics: This stream ensures that every records is effectively received and - * transformed exactly once, but gives no guarantees on whether the transformed data are - * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure - * that the output operation is idempotent, or use transactions to output records atomically. - * See the programming guide for more details. - * - * @param ssc StreamingContext object - * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" - * to be set with Kafka broker(s) (NOT zookeeper servers), specified in - * host1:port1,host2:port2 form. - * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" - * to determine where the stream starts (defaults to "largest") - * @param topics Names of the topics to consume - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - * @tparam KD type of Kafka message key decoder - * @tparam VD type of Kafka message value decoder - * @return DStream of (Kafka message key, Kafka message value) - */ - def createDirectStream[ - K: ClassTag, - V: ClassTag, - KD <: Decoder[K]: ClassTag, - VD <: Decoder[V]: ClassTag] ( - ssc: StreamingContext, - kafkaParams: Map[String, String], - topics: Set[String] - ): InputDStream[(K, V)] = { - val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) - val kc = new KafkaCluster(kafkaParams) - val fromOffsets = getFromOffsets(kc, kafkaParams, topics) - new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( - ssc, kafkaParams, fromOffsets, messageHandler) - } - - /** - * Create an input stream that directly pulls messages from Kafka Brokers - * without using any receiver. This stream can guarantee that each message - * from Kafka is included in transformations exactly once (see points below). - * - * Points to note: - * - No receivers: This stream does not use any receiver. It directly queries Kafka - * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked - * by the stream itself. For interoperability with Kafka monitoring tools that depend on - * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. - * You can access the offsets used in each batch from the generated RDDs (see - * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). - * - Failure Recovery: To recover from driver failures, you have to enable checkpointing - * in the [[StreamingContext]]. The information on consumed offset can be - * recovered from the checkpoint. See the programming guide for details (constraints, etc.). - * - End-to-end semantics: This stream ensures that every records is effectively received and - * transformed exactly once, but gives no guarantees on whether the transformed data are - * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure - * that the output operation is idempotent, or use transactions to output records atomically. - * See the programming guide for more details. - * - * @param jssc JavaStreamingContext object - * @param keyClass Class of the keys in the Kafka records - * @param valueClass Class of the values in the Kafka records - * @param keyDecoderClass Class of the key decoder - * @param valueDecoderClass Class of the value decoder - * @param recordClass Class of the records in DStream - * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" - * to be set with Kafka broker(s) (NOT zookeeper servers), specified in - * host1:port1,host2:port2 form. - * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) - * starting point of the stream - * @param messageHandler Function for translating each message and metadata into the desired type - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - * @tparam KD type of Kafka message key decoder - * @tparam VD type of Kafka message value decoder - * @tparam R type returned by messageHandler - * @return DStream of R - */ - def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( - jssc: JavaStreamingContext, - keyClass: Class[K], - valueClass: Class[V], - keyDecoderClass: Class[KD], - valueDecoderClass: Class[VD], - recordClass: Class[R], - kafkaParams: JMap[String, String], - fromOffsets: JMap[TopicAndPartition, JLong], - messageHandler: JFunction[MessageAndMetadata[K, V], R] - ): JavaInputDStream[R] = { - implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) - implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) - implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) - implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) - implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) - val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _) - createDirectStream[K, V, KD, VD, R]( - jssc.ssc, - Map(kafkaParams.asScala.toSeq: _*), - Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*), - cleanedHandler - ) - } - - /** - * Create an input stream that directly pulls messages from Kafka Brokers - * without using any receiver. This stream can guarantee that each message - * from Kafka is included in transformations exactly once (see points below). - * - * Points to note: - * - No receivers: This stream does not use any receiver. It directly queries Kafka - * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked - * by the stream itself. For interoperability with Kafka monitoring tools that depend on - * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. - * You can access the offsets used in each batch from the generated RDDs (see - * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). - * - Failure Recovery: To recover from driver failures, you have to enable checkpointing - * in the [[StreamingContext]]. The information on consumed offset can be - * recovered from the checkpoint. See the programming guide for details (constraints, etc.). - * - End-to-end semantics: This stream ensures that every records is effectively received and - * transformed exactly once, but gives no guarantees on whether the transformed data are - * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure - * that the output operation is idempotent, or use transactions to output records atomically. - * See the programming guide for more details. - * - * @param jssc JavaStreamingContext object - * @param keyClass Class of the keys in the Kafka records - * @param valueClass Class of the values in the Kafka records - * @param keyDecoderClass Class of the key decoder - * @param valueDecoderClass Class type of the value decoder - * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" - * to be set with Kafka broker(s) (NOT zookeeper servers), specified in - * host1:port1,host2:port2 form. - * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" - * to determine where the stream starts (defaults to "largest") - * @param topics Names of the topics to consume - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - * @tparam KD type of Kafka message key decoder - * @tparam VD type of Kafka message value decoder - * @return DStream of (Kafka message key, Kafka message value) - */ - def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]]( - jssc: JavaStreamingContext, - keyClass: Class[K], - valueClass: Class[V], - keyDecoderClass: Class[KD], - valueDecoderClass: Class[VD], - kafkaParams: JMap[String, String], - topics: JSet[String] - ): JavaPairInputDStream[K, V] = { - implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) - implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) - implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) - implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) - createDirectStream[K, V, KD, VD]( - jssc.ssc, - Map(kafkaParams.asScala.toSeq: _*), - Set(topics.asScala.toSeq: _*) - ) - } -} - -/** - * This is a helper class that wraps the KafkaUtils.createStream() into more - * Python-friendly class and function so that it can be easily - * instantiated and called from Python's KafkaUtils. - * - * The zero-arg constructor helps instantiate this class from the Class object - * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream() - * takes care of known parameters instead of passing them from Python - */ -private[kafka] class KafkaUtilsPythonHelper { - import KafkaUtilsPythonHelper._ - - def createStream( - jssc: JavaStreamingContext, - kafkaParams: JMap[String, String], - topics: JMap[String, JInt], - storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = { - KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( - jssc, - classOf[Array[Byte]], - classOf[Array[Byte]], - classOf[DefaultDecoder], - classOf[DefaultDecoder], - kafkaParams, - topics, - storageLevel) - } - - def createRDDWithoutMessageHandler( - jsc: JavaSparkContext, - kafkaParams: JMap[String, String], - offsetRanges: JList[OffsetRange], - leaders: JMap[TopicAndPartition, Broker]): JavaRDD[(Array[Byte], Array[Byte])] = { - val messageHandler = - (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message) - new JavaRDD(createRDD(jsc, kafkaParams, offsetRanges, leaders, messageHandler)) - } - - def createRDDWithMessageHandler( - jsc: JavaSparkContext, - kafkaParams: JMap[String, String], - offsetRanges: JList[OffsetRange], - leaders: JMap[TopicAndPartition, Broker]): JavaRDD[Array[Byte]] = { - val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => - new PythonMessageAndMetadata( - mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message()) - val rdd = createRDD(jsc, kafkaParams, offsetRanges, leaders, messageHandler). - mapPartitions(picklerIterator) - new JavaRDD(rdd) - } - - private def createRDD[V: ClassTag]( - jsc: JavaSparkContext, - kafkaParams: JMap[String, String], - offsetRanges: JList[OffsetRange], - leaders: JMap[TopicAndPartition, Broker], - messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): RDD[V] = { - KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, V]( - jsc.sc, - kafkaParams.asScala.toMap, - offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())), - leaders.asScala.toMap, - messageHandler - ) - } - - def createDirectStreamWithoutMessageHandler( - jssc: JavaStreamingContext, - kafkaParams: JMap[String, String], - topics: JSet[String], - fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = { - val messageHandler = - (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message) - new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler)) - } - - def createDirectStreamWithMessageHandler( - jssc: JavaStreamingContext, - kafkaParams: JMap[String, String], - topics: JSet[String], - fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = { - val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => - new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message()) - val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler). - mapPartitions(picklerIterator) - new JavaDStream(stream) - } - - private def createDirectStream[V: ClassTag]( - jssc: JavaStreamingContext, - kafkaParams: JMap[String, String], - topics: JSet[String], - fromOffsets: JMap[TopicAndPartition, JLong], - messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): DStream[V] = { - - val currentFromOffsets = if (!fromOffsets.isEmpty) { - val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic) - if (topicsFromOffsets != topics.asScala.toSet) { - throw new IllegalStateException( - s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " + - s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}") - } - Map(fromOffsets.asScala.mapValues { _.longValue() }.toSeq: _*) - } else { - val kc = new KafkaCluster(Map(kafkaParams.asScala.toSeq: _*)) - KafkaUtils.getFromOffsets( - kc, Map(kafkaParams.asScala.toSeq: _*), Set(topics.asScala.toSeq: _*)) - } - - KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, V]( - jssc.ssc, - Map(kafkaParams.asScala.toSeq: _*), - Map(currentFromOffsets.toSeq: _*), - messageHandler) - } - - def createOffsetRange(topic: String, partition: JInt, fromOffset: JLong, untilOffset: JLong - ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, untilOffset) - - def createTopicAndPartition(topic: String, partition: JInt): TopicAndPartition = - TopicAndPartition(topic, partition) - - def createBroker(host: String, port: JInt): Broker = Broker(host, port) - - def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = { - val parentRDDs = rdd.getNarrowAncestors - val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, _, _, _, _]]) - - require( - kafkaRDDs.length == 1, - "Cannot get offset ranges, as there may be multiple Kafka RDDs or no Kafka RDD associated" + - "with this RDD, please call this method only on a Kafka RDD.") - - val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]] - kafkaRDD.offsetRanges.toSeq.asJava - } -} - -private object KafkaUtilsPythonHelper { - private var initialized = false - - def initialize(): Unit = { - SerDeUtil.initialize() - synchronized { - if (!initialized) { - new PythonMessageAndMetadataPickler().register() - initialized = true - } - } - } - - initialize() - - def picklerIterator(iter: Iterator[Any]): Iterator[Array[Byte]] = { - new SerDeUtil.AutoBatchedPickler(iter) - } - - case class PythonMessageAndMetadata( - topic: String, - partition: JInt, - offset: JLong, - key: Array[Byte], - message: Array[Byte]) - - class PythonMessageAndMetadataPickler extends IObjectPickler { - private val module = "pyspark.streaming.kafka" - - def register(): Unit = { - Pickler.registerCustomPickler(classOf[PythonMessageAndMetadata], this) - Pickler.registerCustomPickler(this.getClass, this) - } - - def pickle(obj: Object, out: OutputStream, pickler: Pickler) { - if (obj == this) { - out.write(Opcodes.GLOBAL) - out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8)) - } else { - pickler.save(this) - val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata] - out.write(Opcodes.MARK) - pickler.save(msgAndMetaData.topic) - pickler.save(msgAndMetaData.partition) - pickler.save(msgAndMetaData.offset) - pickler.save(msgAndMetaData.key) - pickler.save(msgAndMetaData.message) - out.write(Opcodes.TUPLE) - out.write(Opcodes.REDUCE) - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala deleted file mode 100644 index d9b856e..0000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import kafka.common.TopicAndPartition - -/** - * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the - * offset ranges in RDDs generated by the direct Kafka DStream (see - * [[KafkaUtils.createDirectStream()]]). - * {{{ - * KafkaUtils.createDirectStream(...).foreachRDD { rdd => - * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges - * ... - * } - * }}} - */ -trait HasOffsetRanges { - def offsetRanges: Array[OffsetRange] -} - -/** - * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class - * can be created with `OffsetRange.create()`. - * @param topic Kafka topic name - * @param partition Kafka partition id - * @param fromOffset Inclusive starting offset - * @param untilOffset Exclusive ending offset - */ -final class OffsetRange private( - val topic: String, - val partition: Int, - val fromOffset: Long, - val untilOffset: Long) extends Serializable { - import OffsetRange.OffsetRangeTuple - - /** Kafka TopicAndPartition object, for convenience */ - def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, partition) - - /** Number of messages this OffsetRange refers to */ - def count(): Long = untilOffset - fromOffset - - override def equals(obj: Any): Boolean = obj match { - case that: OffsetRange => - this.topic == that.topic && - this.partition == that.partition && - this.fromOffset == that.fromOffset && - this.untilOffset == that.untilOffset - case _ => false - } - - override def hashCode(): Int = { - toTuple.hashCode() - } - - override def toString(): String = { - s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])" - } - - /** this is to avoid ClassNotFoundException during checkpoint restore */ - private[streaming] - def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) -} - -/** - * Companion object the provides methods to create instances of [[OffsetRange]]. - */ -object OffsetRange { - def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = - new OffsetRange(topic, partition, fromOffset, untilOffset) - - def create( - topicAndPartition: TopicAndPartition, - fromOffset: Long, - untilOffset: Long): OffsetRange = - new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset) - - def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = - new OffsetRange(topic, partition, fromOffset, untilOffset) - - def apply( - topicAndPartition: TopicAndPartition, - fromOffset: Long, - untilOffset: Long): OffsetRange = - new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset) - - /** this is to avoid ClassNotFoundException during checkpoint restore */ - private[kafka] - type OffsetRangeTuple = (String, Int, Long, Long) - - private[kafka] - def apply(t: OffsetRangeTuple) = - new OffsetRange(t._1, t._2, t._3, t._4) -} http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala deleted file mode 100644 index 39abe3c..0000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import java.util.Properties -import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor} - -import scala.collection.{mutable, Map} -import scala.reflect.{classTag, ClassTag} - -import kafka.common.TopicAndPartition -import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream} -import kafka.message.MessageAndMetadata -import kafka.serializer.Decoder -import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils} -import org.I0Itec.zkclient.ZkClient - -import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging -import org.apache.spark.storage.{StorageLevel, StreamBlockId} -import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} -import org.apache.spark.util.ThreadUtils - -/** - * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss. - * It is turned off by default and will be enabled when - * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver - * is that this receiver manages topic-partition/offset itself and updates the offset information - * after data is reliably stored as write-ahead log. Offsets will only be updated when data is - * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated. - * - * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset - * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams - * will not take effect. - */ -private[streaming] -class ReliableKafkaReceiver[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel) - extends Receiver[(K, V)](storageLevel) with Logging { - - private val groupId = kafkaParams("group.id") - private val AUTO_OFFSET_COMMIT = "auto.commit.enable" - private def conf = SparkEnv.get.conf - - /** High level consumer to connect to Kafka. */ - private var consumerConnector: ConsumerConnector = null - - /** zkClient to connect to Zookeeper to commit the offsets. */ - private var zkClient: ZkClient = null - - /** - * A HashMap to manage the offset for each topic/partition, this HashMap is called in - * synchronized block, so mutable HashMap will not meet concurrency issue. - */ - private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null - - /** A concurrent HashMap to store the stream block id and related offset snapshot. */ - private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null - - /** - * Manage the BlockGenerator in receiver itself for better managing block store and offset - * commit. - */ - private var blockGenerator: BlockGenerator = null - - /** Thread pool running the handlers for receiving message from multiple topics and partitions. */ - private var messageHandlerThreadPool: ThreadPoolExecutor = null - - override def onStart(): Unit = { - logInfo(s"Starting Kafka Consumer Stream with group: $groupId") - - // Initialize the topic-partition / offset hash map. - topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long] - - // Initialize the stream block id / offset snapshot hash map. - blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]() - - // Initialize the block generator for storing Kafka message. - blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler) - - if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") { - logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " + - "otherwise we will manually set it to false to turn off auto offset commit in Kafka") - } - - val props = new Properties() - kafkaParams.foreach(param => props.put(param._1, param._2)) - // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true, - // we have to make sure this property is set to false to turn off auto commit mechanism in - // Kafka. - props.setProperty(AUTO_OFFSET_COMMIT, "false") - - val consumerConfig = new ConsumerConfig(props) - - assert(!consumerConfig.autoCommitEnable) - - logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}") - consumerConnector = Consumer.create(consumerConfig) - logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}") - - zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, - consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) - - messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool( - topics.values.sum, "KafkaMessageHandler") - - blockGenerator.start() - - val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(consumerConfig.props) - .asInstanceOf[Decoder[K]] - - val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(consumerConfig.props) - .asInstanceOf[Decoder[V]] - - val topicMessageStreams = consumerConnector.createMessageStreams( - topics, keyDecoder, valueDecoder) - - topicMessageStreams.values.foreach { streams => - streams.foreach { stream => - messageHandlerThreadPool.submit(new MessageHandler(stream)) - } - } - } - - override def onStop(): Unit = { - if (messageHandlerThreadPool != null) { - messageHandlerThreadPool.shutdown() - messageHandlerThreadPool = null - } - - if (consumerConnector != null) { - consumerConnector.shutdown() - consumerConnector = null - } - - if (zkClient != null) { - zkClient.close() - zkClient = null - } - - if (blockGenerator != null) { - blockGenerator.stop() - blockGenerator = null - } - - if (topicPartitionOffsetMap != null) { - topicPartitionOffsetMap.clear() - topicPartitionOffsetMap = null - } - - if (blockOffsetMap != null) { - blockOffsetMap.clear() - blockOffsetMap = null - } - } - - /** Store a Kafka message and the associated metadata as a tuple. */ - private def storeMessageAndMetadata( - msgAndMetadata: MessageAndMetadata[K, V]): Unit = { - val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition) - val data = (msgAndMetadata.key, msgAndMetadata.message) - val metadata = (topicAndPartition, msgAndMetadata.offset) - blockGenerator.addDataWithCallback(data, metadata) - } - - /** Update stored offset */ - private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = { - topicPartitionOffsetMap.put(topicAndPartition, offset) - } - - /** - * Remember the current offsets for each topic and partition. This is called when a block is - * generated. - */ - private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { - // Get a snapshot of current offset map and store with related block id. - val offsetSnapshot = topicPartitionOffsetMap.toMap - blockOffsetMap.put(blockId, offsetSnapshot) - topicPartitionOffsetMap.clear() - } - - /** - * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method - * will try a fixed number of times to push the block. If the push fails, the receiver is stopped. - */ - private def storeBlockAndCommitOffset( - blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { - var count = 0 - var pushed = false - var exception: Exception = null - while (!pushed && count <= 3) { - try { - store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]]) - pushed = true - } catch { - case ex: Exception => - count += 1 - exception = ex - } - } - if (pushed) { - Option(blockOffsetMap.get(blockId)).foreach(commitOffset) - blockOffsetMap.remove(blockId) - } else { - stop("Error while storing block into Spark", exception) - } - } - - /** - * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's - * metadata schema in Zookeeper. - */ - private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = { - if (zkClient == null) { - val thrown = new IllegalStateException("Zookeeper client is unexpectedly null") - stop("Zookeeper client is not initialized before commit offsets to ZK", thrown) - return - } - - for ((topicAndPart, offset) <- offsetMap) { - try { - val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic) - val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}" - - ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString) - } catch { - case e: Exception => - logWarning(s"Exception during commit offset $offset for topic" + - s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e) - } - - logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " + - s"partition ${topicAndPart.partition}") - } - } - - /** Class to handle received Kafka message. */ - private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable { - override def run(): Unit = { - while (!isStopped) { - try { - val streamIterator = stream.iterator() - while (streamIterator.hasNext) { - storeMessageAndMetadata(streamIterator.next) - } - } catch { - case e: Exception => - reportError("Error handling message", e) - } - } - } - } - - /** Class to handle blocks generated by the block generator. */ - private final class GeneratedBlockHandler extends BlockGeneratorListener { - - def onAddData(data: Any, metadata: Any): Unit = { - // Update the offset of the data that was added to the generator - if (metadata != null) { - val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)] - updateOffset(topicAndPartition, offset) - } - } - - def onGenerateBlock(blockId: StreamBlockId): Unit = { - // Remember the offsets of topics/partitions when a block has been generated - rememberBlockOffsets(blockId) - } - - def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { - // Store block and commit the blocks offset - storeBlockAndCommitOffset(blockId, arrayBuffer) - } - - def onError(message: String, throwable: Throwable): Unit = { - reportError(message, throwable) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java deleted file mode 100644 index 2e5ab0f..0000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Kafka receiver for spark streaming. - */ -package org.apache.spark.streaming.kafka; http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala deleted file mode 100644 index 47c5187..0000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -/** - * Kafka receiver for spark streaming, - */ -package object kafka http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java deleted file mode 100644 index fa6b0db..0000000 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka; - -import java.io.Serializable; -import java.util.*; -import java.util.concurrent.atomic.AtomicReference; - -import scala.Tuple2; - -import kafka.common.TopicAndPartition; -import kafka.message.MessageAndMetadata; -import kafka.serializer.StringDecoder; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.streaming.Durations; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; - -public class JavaDirectKafkaStreamSuite implements Serializable { - private transient JavaStreamingContext ssc = null; - private transient KafkaTestUtils kafkaTestUtils = null; - - @Before - public void setUp() { - kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setup(); - SparkConf sparkConf = new SparkConf() - .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); - ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200)); - } - - @After - public void tearDown() { - if (ssc != null) { - ssc.stop(); - ssc = null; - } - - if (kafkaTestUtils != null) { - kafkaTestUtils.teardown(); - kafkaTestUtils = null; - } - } - - @Test - public void testKafkaStream() throws InterruptedException { - final String topic1 = "topic1"; - final String topic2 = "topic2"; - // hold a reference to the current offset ranges, so it can be used downstream - final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); - - String[] topic1data = createTopicAndSendData(topic1); - String[] topic2data = createTopicAndSendData(topic2); - - Set<String> sent = new HashSet<>(); - sent.addAll(Arrays.asList(topic1data)); - sent.addAll(Arrays.asList(topic2data)); - - Map<String, String> kafkaParams = new HashMap<>(); - kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); - kafkaParams.put("auto.offset.reset", "smallest"); - - JavaDStream<String> stream1 = KafkaUtils.createDirectStream( - ssc, - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - kafkaParams, - topicToSet(topic1) - ).transformToPair( - // Make sure you can get offset ranges from the rdd - new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { - @Override - public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) { - OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - offsetRanges.set(offsets); - Assert.assertEquals(topic1, offsets[0].topic()); - return rdd; - } - } - ).map( - new Function<Tuple2<String, String>, String>() { - @Override - public String call(Tuple2<String, String> kv) { - return kv._2(); - } - } - ); - - JavaDStream<String> stream2 = KafkaUtils.createDirectStream( - ssc, - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - String.class, - kafkaParams, - topicOffsetToMap(topic2, 0L), - new Function<MessageAndMetadata<String, String>, String>() { - @Override - public String call(MessageAndMetadata<String, String> msgAndMd) { - return msgAndMd.message(); - } - } - ); - JavaDStream<String> unifiedStream = stream1.union(stream2); - - final Set<String> result = Collections.synchronizedSet(new HashSet<String>()); - unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { - @Override - public void call(JavaRDD<String> rdd) { - result.addAll(rdd.collect()); - for (OffsetRange o : offsetRanges.get()) { - System.out.println( - o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() - ); - } - } - } - ); - ssc.start(); - long startTime = System.currentTimeMillis(); - boolean matches = false; - while (!matches && System.currentTimeMillis() - startTime < 20000) { - matches = sent.size() == result.size(); - Thread.sleep(50); - } - Assert.assertEquals(sent, result); - ssc.stop(); - } - - private static Set<String> topicToSet(String topic) { - Set<String> topicSet = new HashSet<>(); - topicSet.add(topic); - return topicSet; - } - - private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) { - Map<TopicAndPartition, Long> topicMap = new HashMap<>(); - topicMap.put(new TopicAndPartition(topic, 0), offsetToStart); - return topicMap; - } - - private String[] createTopicAndSendData(String topic) { - String[] data = { topic + "-1", topic + "-2", topic + "-3"}; - kafkaTestUtils.createTopic(topic, 1); - kafkaTestUtils.sendMessages(topic, data); - return data; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
