http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala new file mode 100644 index 0000000..edaafb9 --- /dev/null +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -0,0 +1,805 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.io.OutputStream +import java.lang.{Integer => JInt, Long => JLong} +import java.nio.charset.StandardCharsets +import java.util.{List => JList, Map => JMap, Set => JSet} + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder} +import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler} + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.api.python.SerDeUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java._ +import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} +import org.apache.spark.streaming.util.WriteAheadLogUtils + +object KafkaUtils { + /** + * Create an input stream that pulls messages from Kafka Brokers. + * @param ssc StreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) + * @param groupId The group id for this consumer + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + * @return DStream of (Kafka message key, Kafka message value) + */ + def createStream( + ssc: StreamingContext, + zkQuorum: String, + groupId: String, + topics: Map[String, Int], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): ReceiverInputDStream[(String, String)] = { + val kafkaParams = Map[String, String]( + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "10000") + createStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topics, storageLevel) + } + + /** + * Create an input stream that pulls messages from Kafka Brokers. + * @param ssc StreamingContext object + * @param kafkaParams Map of kafka configuration parameters, + * see http://kafka.apache.org/08/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam U type of Kafka message key decoder + * @tparam T type of Kafka message value decoder + * @return DStream of (Kafka message key, Kafka message value) + */ + def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( + ssc: StreamingContext, + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ): ReceiverInputDStream[(K, V)] = { + val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) + new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) + } + + /** + * Create an input stream that pulls messages from Kafka Brokers. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. + * @param jssc JavaStreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) + * @param groupId The group id for this consumer + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread + * @return DStream of (Kafka message key, Kafka message value) + */ + def createStream( + jssc: JavaStreamingContext, + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt] + ): JavaPairReceiverInputDStream[String, String] = { + createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages from Kafka Brokers. + * @param jssc JavaStreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + * @return DStream of (Kafka message key, Kafka message value) + */ + def createStream( + jssc: JavaStreamingContext, + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairReceiverInputDStream[String, String] = { + createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*), + storageLevel) + } + + /** + * Create an input stream that pulls messages from Kafka Brokers. + * @param jssc JavaStreamingContext object + * @param keyTypeClass Key type of DStream + * @param valueTypeClass value type of Dstream + * @param keyDecoderClass Type of kafka key decoder + * @param valueDecoderClass Type of kafka value decoder + * @param kafkaParams Map of kafka configuration parameters, + * see http://kafka.apache.org/08/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread + * @param storageLevel RDD storage level. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam U type of Kafka message key decoder + * @tparam T type of Kafka message value decoder + * @return DStream of (Kafka message key, Kafka message value) + */ + def createStream[K, V, U <: Decoder[_], T <: Decoder[_]]( + jssc: JavaStreamingContext, + keyTypeClass: Class[K], + valueTypeClass: Class[V], + keyDecoderClass: Class[U], + valueDecoderClass: Class[T], + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairReceiverInputDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass) + + implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass) + implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass) + + createStream[K, V, U, T]( + jssc.ssc, + kafkaParams.asScala.toMap, + Map(topics.asScala.mapValues(_.intValue()).toSeq: _*), + storageLevel) + } + + /** get leaders for the given offset ranges, or throw an exception */ + private def leadersForRanges( + kc: KafkaCluster, + offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = { + val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet + val leaders = kc.findLeaders(topics) + KafkaCluster.checkErrors(leaders) + } + + /** Make sure offsets are available in kafka, or throw an exception */ + private def checkOffsets( + kc: KafkaCluster, + offsetRanges: Array[OffsetRange]): Unit = { + val topics = offsetRanges.map(_.topicAndPartition).toSet + val result = for { + low <- kc.getEarliestLeaderOffsets(topics).right + high <- kc.getLatestLeaderOffsets(topics).right + } yield { + offsetRanges.filterNot { o => + low(o.topicAndPartition).offset <= o.fromOffset && + o.untilOffset <= high(o.topicAndPartition).offset + } + } + val badRanges = KafkaCluster.checkErrors(result) + if (!badRanges.isEmpty) { + throw new SparkException("Offsets not available on leader: " + badRanges.mkString(",")) + } + } + + private[kafka] def getFromOffsets( + kc: KafkaCluster, + kafkaParams: Map[String, String], + topics: Set[String] + ): Map[TopicAndPartition, Long] = { + val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) + val result = for { + topicPartitions <- kc.getPartitions(topics).right + leaderOffsets <- (if (reset == Some("smallest")) { + kc.getEarliestLeaderOffsets(topicPartitions) + } else { + kc.getLatestLeaderOffsets(topicPartitions) + }).right + } yield { + leaderOffsets.map { case (tp, lo) => + (tp, lo.offset) + } + } + KafkaCluster.checkErrors(result) + } + + /** + * Create a RDD from Kafka using offset ranges for each topic and partition. + * + * @param sc SparkContext object + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam KD type of Kafka message key decoder + * @tparam VD type of Kafka message value decoder + * @return RDD of (Kafka message key, Kafka message value) + */ + def createRDD[ + K: ClassTag, + V: ClassTag, + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag]( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] = sc.withScope { + val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) + val kc = new KafkaCluster(kafkaParams) + val leaders = leadersForRanges(kc, offsetRanges) + checkOffsets(kc, offsetRanges) + new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * + * @param sc SparkContext object + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, + * in which case leaders will be looked up on the driver. + * @param messageHandler Function for translating each message and metadata into the desired type + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam KD type of Kafka message key decoder + * @tparam VD type of Kafka message value decoder + * @tparam R type returned by messageHandler + * @return RDD of R + */ + def createRDD[ + K: ClassTag, + V: ClassTag, + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag, + R: ClassTag]( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Map[TopicAndPartition, Broker], + messageHandler: MessageAndMetadata[K, V] => R + ): RDD[R] = sc.withScope { + val kc = new KafkaCluster(kafkaParams) + val leaderMap = if (leaders.isEmpty) { + leadersForRanges(kc, offsetRanges) + } else { + // This could be avoided by refactoring KafkaRDD.leaders and KafkaCluster to use Broker + leaders.map { + case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port)) + } + } + val cleanedHandler = sc.clean(messageHandler) + checkOffsets(kc, offsetRanges) + new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler) + } + + /** + * Create a RDD from Kafka using offset ranges for each topic and partition. + * + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param keyClass type of Kafka message key + * @param valueClass type of Kafka message value + * @param keyDecoderClass type of Kafka message key decoder + * @param valueDecoderClass type of Kafka message value decoder + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam KD type of Kafka message key decoder + * @tparam VD type of Kafka message value decoder + * @return RDD of (Kafka message key, Kafka message value) + */ + def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] + ): JavaPairRDD[K, V] = jsc.sc.withScope { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges)) + } + + /** + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, + * in which case leaders will be looked up on the driver. + * @param messageHandler Function for translating each message and metadata into the desired type + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam KD type of Kafka message key decoder + * @tparam VD type of Kafka message value decoder + * @tparam R type returned by messageHandler + * @return RDD of R + */ + def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + recordClass: Class[R], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange], + leaders: JMap[TopicAndPartition, Broker], + messageHandler: JFunction[MessageAndMetadata[K, V], R] + ): JavaRDD[R] = jsc.sc.withScope { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) + val leaderMap = Map(leaders.asScala.toSeq: _*) + createRDD[K, V, KD, VD, R]( + jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, leaderMap, messageHandler.call(_)) + } + + /** + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). + * + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler Function for translating each message and metadata into the desired type + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam KD type of Kafka message key decoder + * @tparam VD type of Kafka message value decoder + * @tparam R type returned by messageHandler + * @return DStream of R + */ + def createDirectStream[ + K: ClassTag, + V: ClassTag, + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag, + R: ClassTag] ( + ssc: StreamingContext, + kafkaParams: Map[String, String], + fromOffsets: Map[TopicAndPartition, Long], + messageHandler: MessageAndMetadata[K, V] => R + ): InputDStream[R] = { + val cleanedHandler = ssc.sc.clean(messageHandler) + new DirectKafkaInputDStream[K, V, KD, VD, R]( + ssc, kafkaParams, fromOffsets, cleanedHandler) + } + + /** + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). + * + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" + * to determine where the stream starts (defaults to "largest") + * @param topics Names of the topics to consume + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam KD type of Kafka message key decoder + * @tparam VD type of Kafka message value decoder + * @return DStream of (Kafka message key, Kafka message value) + */ + def createDirectStream[ + K: ClassTag, + V: ClassTag, + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag] ( + ssc: StreamingContext, + kafkaParams: Map[String, String], + topics: Set[String] + ): InputDStream[(K, V)] = { + val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) + val kc = new KafkaCluster(kafkaParams) + val fromOffsets = getFromOffsets(kc, kafkaParams, topics) + new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( + ssc, kafkaParams, fromOffsets, messageHandler) + } + + /** + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). + * + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. + * + * @param jssc JavaStreamingContext object + * @param keyClass Class of the keys in the Kafka records + * @param valueClass Class of the values in the Kafka records + * @param keyDecoderClass Class of the key decoder + * @param valueDecoderClass Class of the value decoder + * @param recordClass Class of the records in DStream + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler Function for translating each message and metadata into the desired type + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam KD type of Kafka message key decoder + * @tparam VD type of Kafka message value decoder + * @tparam R type returned by messageHandler + * @return DStream of R + */ + def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + jssc: JavaStreamingContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + recordClass: Class[R], + kafkaParams: JMap[String, String], + fromOffsets: JMap[TopicAndPartition, JLong], + messageHandler: JFunction[MessageAndMetadata[K, V], R] + ): JavaInputDStream[R] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) + val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _) + createDirectStream[K, V, KD, VD, R]( + jssc.ssc, + Map(kafkaParams.asScala.toSeq: _*), + Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*), + cleanedHandler + ) + } + + /** + * Create an input stream that directly pulls messages from Kafka Brokers + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). + * + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. + * + * @param jssc JavaStreamingContext object + * @param keyClass Class of the keys in the Kafka records + * @param valueClass Class of the values in the Kafka records + * @param keyDecoderClass Class of the key decoder + * @param valueDecoderClass Class type of the value decoder + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" + * to determine where the stream starts (defaults to "largest") + * @param topics Names of the topics to consume + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam KD type of Kafka message key decoder + * @tparam VD type of Kafka message value decoder + * @return DStream of (Kafka message key, Kafka message value) + */ + def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]]( + jssc: JavaStreamingContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + topics: JSet[String] + ): JavaPairInputDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + createDirectStream[K, V, KD, VD]( + jssc.ssc, + Map(kafkaParams.asScala.toSeq: _*), + Set(topics.asScala.toSeq: _*) + ) + } +} + +/** + * This is a helper class that wraps the KafkaUtils.createStream() into more + * Python-friendly class and function so that it can be easily + * instantiated and called from Python's KafkaUtils. + * + * The zero-arg constructor helps instantiate this class from the Class object + * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream() + * takes care of known parameters instead of passing them from Python + */ +private[kafka] class KafkaUtilsPythonHelper { + import KafkaUtilsPythonHelper._ + + def createStream( + jssc: JavaStreamingContext, + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = { + KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( + jssc, + classOf[Array[Byte]], + classOf[Array[Byte]], + classOf[DefaultDecoder], + classOf[DefaultDecoder], + kafkaParams, + topics, + storageLevel) + } + + def createRDDWithoutMessageHandler( + jsc: JavaSparkContext, + kafkaParams: JMap[String, String], + offsetRanges: JList[OffsetRange], + leaders: JMap[TopicAndPartition, Broker]): JavaRDD[(Array[Byte], Array[Byte])] = { + val messageHandler = + (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message) + new JavaRDD(createRDD(jsc, kafkaParams, offsetRanges, leaders, messageHandler)) + } + + def createRDDWithMessageHandler( + jsc: JavaSparkContext, + kafkaParams: JMap[String, String], + offsetRanges: JList[OffsetRange], + leaders: JMap[TopicAndPartition, Broker]): JavaRDD[Array[Byte]] = { + val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => + new PythonMessageAndMetadata( + mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message()) + val rdd = createRDD(jsc, kafkaParams, offsetRanges, leaders, messageHandler). + mapPartitions(picklerIterator) + new JavaRDD(rdd) + } + + private def createRDD[V: ClassTag]( + jsc: JavaSparkContext, + kafkaParams: JMap[String, String], + offsetRanges: JList[OffsetRange], + leaders: JMap[TopicAndPartition, Broker], + messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): RDD[V] = { + KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, V]( + jsc.sc, + kafkaParams.asScala.toMap, + offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())), + leaders.asScala.toMap, + messageHandler + ) + } + + def createDirectStreamWithoutMessageHandler( + jssc: JavaStreamingContext, + kafkaParams: JMap[String, String], + topics: JSet[String], + fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = { + val messageHandler = + (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message) + new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler)) + } + + def createDirectStreamWithMessageHandler( + jssc: JavaStreamingContext, + kafkaParams: JMap[String, String], + topics: JSet[String], + fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = { + val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => + new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message()) + val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler). + mapPartitions(picklerIterator) + new JavaDStream(stream) + } + + private def createDirectStream[V: ClassTag]( + jssc: JavaStreamingContext, + kafkaParams: JMap[String, String], + topics: JSet[String], + fromOffsets: JMap[TopicAndPartition, JLong], + messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): DStream[V] = { + + val currentFromOffsets = if (!fromOffsets.isEmpty) { + val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic) + if (topicsFromOffsets != topics.asScala.toSet) { + throw new IllegalStateException( + s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " + + s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}") + } + Map(fromOffsets.asScala.mapValues { _.longValue() }.toSeq: _*) + } else { + val kc = new KafkaCluster(Map(kafkaParams.asScala.toSeq: _*)) + KafkaUtils.getFromOffsets( + kc, Map(kafkaParams.asScala.toSeq: _*), Set(topics.asScala.toSeq: _*)) + } + + KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, V]( + jssc.ssc, + Map(kafkaParams.asScala.toSeq: _*), + Map(currentFromOffsets.toSeq: _*), + messageHandler) + } + + def createOffsetRange(topic: String, partition: JInt, fromOffset: JLong, untilOffset: JLong + ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, untilOffset) + + def createTopicAndPartition(topic: String, partition: JInt): TopicAndPartition = + TopicAndPartition(topic, partition) + + def createBroker(host: String, port: JInt): Broker = Broker(host, port) + + def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = { + val parentRDDs = rdd.getNarrowAncestors + val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, _, _, _, _]]) + + require( + kafkaRDDs.length == 1, + "Cannot get offset ranges, as there may be multiple Kafka RDDs or no Kafka RDD associated" + + "with this RDD, please call this method only on a Kafka RDD.") + + val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]] + kafkaRDD.offsetRanges.toSeq.asJava + } +} + +private object KafkaUtilsPythonHelper { + private var initialized = false + + def initialize(): Unit = { + SerDeUtil.initialize() + synchronized { + if (!initialized) { + new PythonMessageAndMetadataPickler().register() + initialized = true + } + } + } + + initialize() + + def picklerIterator(iter: Iterator[Any]): Iterator[Array[Byte]] = { + new SerDeUtil.AutoBatchedPickler(iter) + } + + case class PythonMessageAndMetadata( + topic: String, + partition: JInt, + offset: JLong, + key: Array[Byte], + message: Array[Byte]) + + class PythonMessageAndMetadataPickler extends IObjectPickler { + private val module = "pyspark.streaming.kafka" + + def register(): Unit = { + Pickler.registerCustomPickler(classOf[PythonMessageAndMetadata], this) + Pickler.registerCustomPickler(this.getClass, this) + } + + def pickle(obj: Object, out: OutputStream, pickler: Pickler) { + if (obj == this) { + out.write(Opcodes.GLOBAL) + out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8)) + } else { + pickler.save(this) + val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata] + out.write(Opcodes.MARK) + pickler.save(msgAndMetaData.topic) + pickler.save(msgAndMetaData.partition) + pickler.save(msgAndMetaData.offset) + pickler.save(msgAndMetaData.key) + pickler.save(msgAndMetaData.message) + out.write(Opcodes.TUPLE) + out.write(Opcodes.REDUCE) + } + } + } +}
http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala new file mode 100644 index 0000000..d9b856e --- /dev/null +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition + +/** + * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the + * offset ranges in RDDs generated by the direct Kafka DStream (see + * [[KafkaUtils.createDirectStream()]]). + * {{{ + * KafkaUtils.createDirectStream(...).foreachRDD { rdd => + * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + * ... + * } + * }}} + */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** + * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class + * can be created with `OffsetRange.create()`. + * @param topic Kafka topic name + * @param partition Kafka partition id + * @param fromOffset Inclusive starting offset + * @param untilOffset Exclusive ending offset + */ +final class OffsetRange private( + val topic: String, + val partition: Int, + val fromOffset: Long, + val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** Kafka TopicAndPartition object, for convenience */ + def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, partition) + + /** Number of messages this OffsetRange refers to */ + def count(): Long = untilOffset - fromOffset + + override def equals(obj: Any): Boolean = obj match { + case that: OffsetRange => + this.topic == that.topic && + this.partition == that.partition && + this.fromOffset == that.fromOffset && + this.untilOffset == that.untilOffset + case _ => false + } + + override def hashCode(): Int = { + toTuple.hashCode() + } + + override def toString(): String = { + s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])" + } + + /** this is to avoid ClassNotFoundException during checkpoint restore */ + private[streaming] + def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) +} + +/** + * Companion object the provides methods to create instances of [[OffsetRange]]. + */ +object OffsetRange { + def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = + new OffsetRange(topic, partition, fromOffset, untilOffset) + + def create( + topicAndPartition: TopicAndPartition, + fromOffset: Long, + untilOffset: Long): OffsetRange = + new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset) + + def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = + new OffsetRange(topic, partition, fromOffset, untilOffset) + + def apply( + topicAndPartition: TopicAndPartition, + fromOffset: Long, + untilOffset: Long): OffsetRange = + new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset) + + /** this is to avoid ClassNotFoundException during checkpoint restore */ + private[kafka] + type OffsetRangeTuple = (String, Int, Long, Long) + + private[kafka] + def apply(t: OffsetRangeTuple) = + new OffsetRange(t._1, t._2, t._3, t._4) +} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala new file mode 100644 index 0000000..39abe3c --- /dev/null +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.util.Properties +import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor} + +import scala.collection.{mutable, Map} +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream} +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder +import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils} +import org.I0Itec.zkclient.ZkClient + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.storage.{StorageLevel, StreamBlockId} +import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} +import org.apache.spark.util.ThreadUtils + +/** + * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss. + * It is turned off by default and will be enabled when + * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver + * is that this receiver manages topic-partition/offset itself and updates the offset information + * after data is reliably stored as write-ahead log. Offsets will only be updated when data is + * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated. + * + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset + * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams + * will not take effect. + */ +private[streaming] +class ReliableKafkaReceiver[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: ClassTag, + T <: Decoder[_]: ClassTag]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel) + extends Receiver[(K, V)](storageLevel) with Logging { + + private val groupId = kafkaParams("group.id") + private val AUTO_OFFSET_COMMIT = "auto.commit.enable" + private def conf = SparkEnv.get.conf + + /** High level consumer to connect to Kafka. */ + private var consumerConnector: ConsumerConnector = null + + /** zkClient to connect to Zookeeper to commit the offsets. */ + private var zkClient: ZkClient = null + + /** + * A HashMap to manage the offset for each topic/partition, this HashMap is called in + * synchronized block, so mutable HashMap will not meet concurrency issue. + */ + private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null + + /** A concurrent HashMap to store the stream block id and related offset snapshot. */ + private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null + + /** + * Manage the BlockGenerator in receiver itself for better managing block store and offset + * commit. + */ + private var blockGenerator: BlockGenerator = null + + /** Thread pool running the handlers for receiving message from multiple topics and partitions. */ + private var messageHandlerThreadPool: ThreadPoolExecutor = null + + override def onStart(): Unit = { + logInfo(s"Starting Kafka Consumer Stream with group: $groupId") + + // Initialize the topic-partition / offset hash map. + topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long] + + // Initialize the stream block id / offset snapshot hash map. + blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]() + + // Initialize the block generator for storing Kafka message. + blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler) + + if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") { + logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " + + "otherwise we will manually set it to false to turn off auto offset commit in Kafka") + } + + val props = new Properties() + kafkaParams.foreach(param => props.put(param._1, param._2)) + // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true, + // we have to make sure this property is set to false to turn off auto commit mechanism in + // Kafka. + props.setProperty(AUTO_OFFSET_COMMIT, "false") + + val consumerConfig = new ConsumerConfig(props) + + assert(!consumerConfig.autoCommitEnable) + + logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}") + consumerConnector = Consumer.create(consumerConfig) + logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}") + + zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, + consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) + + messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool( + topics.values.sum, "KafkaMessageHandler") + + blockGenerator.start() + + val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[K]] + + val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[V]] + + val topicMessageStreams = consumerConnector.createMessageStreams( + topics, keyDecoder, valueDecoder) + + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => + messageHandlerThreadPool.submit(new MessageHandler(stream)) + } + } + } + + override def onStop(): Unit = { + if (messageHandlerThreadPool != null) { + messageHandlerThreadPool.shutdown() + messageHandlerThreadPool = null + } + + if (consumerConnector != null) { + consumerConnector.shutdown() + consumerConnector = null + } + + if (zkClient != null) { + zkClient.close() + zkClient = null + } + + if (blockGenerator != null) { + blockGenerator.stop() + blockGenerator = null + } + + if (topicPartitionOffsetMap != null) { + topicPartitionOffsetMap.clear() + topicPartitionOffsetMap = null + } + + if (blockOffsetMap != null) { + blockOffsetMap.clear() + blockOffsetMap = null + } + } + + /** Store a Kafka message and the associated metadata as a tuple. */ + private def storeMessageAndMetadata( + msgAndMetadata: MessageAndMetadata[K, V]): Unit = { + val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition) + val data = (msgAndMetadata.key, msgAndMetadata.message) + val metadata = (topicAndPartition, msgAndMetadata.offset) + blockGenerator.addDataWithCallback(data, metadata) + } + + /** Update stored offset */ + private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = { + topicPartitionOffsetMap.put(topicAndPartition, offset) + } + + /** + * Remember the current offsets for each topic and partition. This is called when a block is + * generated. + */ + private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { + // Get a snapshot of current offset map and store with related block id. + val offsetSnapshot = topicPartitionOffsetMap.toMap + blockOffsetMap.put(blockId, offsetSnapshot) + topicPartitionOffsetMap.clear() + } + + /** + * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method + * will try a fixed number of times to push the block. If the push fails, the receiver is stopped. + */ + private def storeBlockAndCommitOffset( + blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + var count = 0 + var pushed = false + var exception: Exception = null + while (!pushed && count <= 3) { + try { + store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]]) + pushed = true + } catch { + case ex: Exception => + count += 1 + exception = ex + } + } + if (pushed) { + Option(blockOffsetMap.get(blockId)).foreach(commitOffset) + blockOffsetMap.remove(blockId) + } else { + stop("Error while storing block into Spark", exception) + } + } + + /** + * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's + * metadata schema in Zookeeper. + */ + private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = { + if (zkClient == null) { + val thrown = new IllegalStateException("Zookeeper client is unexpectedly null") + stop("Zookeeper client is not initialized before commit offsets to ZK", thrown) + return + } + + for ((topicAndPart, offset) <- offsetMap) { + try { + val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic) + val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}" + + ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString) + } catch { + case e: Exception => + logWarning(s"Exception during commit offset $offset for topic" + + s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e) + } + + logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " + + s"partition ${topicAndPart.partition}") + } + } + + /** Class to handle received Kafka message. */ + private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable { + override def run(): Unit = { + while (!isStopped) { + try { + val streamIterator = stream.iterator() + while (streamIterator.hasNext) { + storeMessageAndMetadata(streamIterator.next) + } + } catch { + case e: Exception => + reportError("Error handling message", e) + } + } + } + } + + /** Class to handle blocks generated by the block generator. */ + private final class GeneratedBlockHandler extends BlockGeneratorListener { + + def onAddData(data: Any, metadata: Any): Unit = { + // Update the offset of the data that was added to the generator + if (metadata != null) { + val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)] + updateOffset(topicAndPartition, offset) + } + } + + def onGenerateBlock(blockId: StreamBlockId): Unit = { + // Remember the offsets of topics/partitions when a block has been generated + rememberBlockOffsets(blockId) + } + + def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + // Store block and commit the blocks offset + storeBlockAndCommitOffset(blockId, arrayBuffer) + } + + def onError(message: String, throwable: Throwable): Unit = { + reportError(message, throwable) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java new file mode 100644 index 0000000..2e5ab0f --- /dev/null +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Kafka receiver for spark streaming. + */ +package org.apache.spark.streaming.kafka; http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala new file mode 100644 index 0000000..47c5187 --- /dev/null +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +/** + * Kafka receiver for spark streaming, + */ +package object kafka http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java new file mode 100644 index 0000000..fa6b0db --- /dev/null +++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +import scala.Tuple2; + +import kafka.common.TopicAndPartition; +import kafka.message.MessageAndMetadata; +import kafka.serializer.StringDecoder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +public class JavaDirectKafkaStreamSuite implements Serializable { + private transient JavaStreamingContext ssc = null; + private transient KafkaTestUtils kafkaTestUtils = null; + + @Before + public void setUp() { + kafkaTestUtils = new KafkaTestUtils(); + kafkaTestUtils.setup(); + SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); + ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200)); + } + + @After + public void tearDown() { + if (ssc != null) { + ssc.stop(); + ssc = null; + } + + if (kafkaTestUtils != null) { + kafkaTestUtils.teardown(); + kafkaTestUtils = null; + } + } + + @Test + public void testKafkaStream() throws InterruptedException { + final String topic1 = "topic1"; + final String topic2 = "topic2"; + // hold a reference to the current offset ranges, so it can be used downstream + final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); + + String[] topic1data = createTopicAndSendData(topic1); + String[] topic2data = createTopicAndSendData(topic2); + + Set<String> sent = new HashSet<>(); + sent.addAll(Arrays.asList(topic1data)); + sent.addAll(Arrays.asList(topic2data)); + + Map<String, String> kafkaParams = new HashMap<>(); + kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); + kafkaParams.put("auto.offset.reset", "smallest"); + + JavaDStream<String> stream1 = KafkaUtils.createDirectStream( + ssc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topicToSet(topic1) + ).transformToPair( + // Make sure you can get offset ranges from the rdd + new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { + @Override + public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) { + OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + offsetRanges.set(offsets); + Assert.assertEquals(topic1, offsets[0].topic()); + return rdd; + } + } + ).map( + new Function<Tuple2<String, String>, String>() { + @Override + public String call(Tuple2<String, String> kv) { + return kv._2(); + } + } + ); + + JavaDStream<String> stream2 = KafkaUtils.createDirectStream( + ssc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + String.class, + kafkaParams, + topicOffsetToMap(topic2, 0L), + new Function<MessageAndMetadata<String, String>, String>() { + @Override + public String call(MessageAndMetadata<String, String> msgAndMd) { + return msgAndMd.message(); + } + } + ); + JavaDStream<String> unifiedStream = stream1.union(stream2); + + final Set<String> result = Collections.synchronizedSet(new HashSet<String>()); + unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { + @Override + public void call(JavaRDD<String> rdd) { + result.addAll(rdd.collect()); + for (OffsetRange o : offsetRanges.get()) { + System.out.println( + o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() + ); + } + } + } + ); + ssc.start(); + long startTime = System.currentTimeMillis(); + boolean matches = false; + while (!matches && System.currentTimeMillis() - startTime < 20000) { + matches = sent.size() == result.size(); + Thread.sleep(50); + } + Assert.assertEquals(sent, result); + ssc.stop(); + } + + private static Set<String> topicToSet(String topic) { + Set<String> topicSet = new HashSet<>(); + topicSet.add(topic); + return topicSet; + } + + private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) { + Map<TopicAndPartition, Long> topicMap = new HashMap<>(); + topicMap.put(new TopicAndPartition(topic, 0), offsetToStart); + return topicMap; + } + + private String[] createTopicAndSendData(String topic) { + String[] data = { topic + "-1", topic + "-2", topic + "-3"}; + kafkaTestUtils.createTopic(topic, 1); + kafkaTestUtils.sendMessages(topic, data); + return data; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java new file mode 100644 index 0000000..c41b629 --- /dev/null +++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import scala.Tuple2; + +import kafka.common.TopicAndPartition; +import kafka.message.MessageAndMetadata; +import kafka.serializer.StringDecoder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +public class JavaKafkaRDDSuite implements Serializable { + private transient JavaSparkContext sc = null; + private transient KafkaTestUtils kafkaTestUtils = null; + + @Before + public void setUp() { + kafkaTestUtils = new KafkaTestUtils(); + kafkaTestUtils.setup(); + SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); + sc = new JavaSparkContext(sparkConf); + } + + @After + public void tearDown() { + if (sc != null) { + sc.stop(); + sc = null; + } + + if (kafkaTestUtils != null) { + kafkaTestUtils.teardown(); + kafkaTestUtils = null; + } + } + + @Test + public void testKafkaRDD() throws InterruptedException { + String topic1 = "topic1"; + String topic2 = "topic2"; + + createTopicAndSendData(topic1); + createTopicAndSendData(topic2); + + Map<String, String> kafkaParams = new HashMap<>(); + kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); + + OffsetRange[] offsetRanges = { + OffsetRange.create(topic1, 0, 0, 1), + OffsetRange.create(topic2, 0, 0, 1) + }; + + Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>(); + Map<TopicAndPartition, Broker> leaders = new HashMap<>(); + String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":"); + Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + leaders.put(new TopicAndPartition(topic1, 0), broker); + leaders.put(new TopicAndPartition(topic2, 0), broker); + + JavaRDD<String> rdd1 = KafkaUtils.createRDD( + sc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + offsetRanges + ).map( + new Function<Tuple2<String, String>, String>() { + @Override + public String call(Tuple2<String, String> kv) { + return kv._2(); + } + } + ); + + JavaRDD<String> rdd2 = KafkaUtils.createRDD( + sc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + String.class, + kafkaParams, + offsetRanges, + emptyLeaders, + new Function<MessageAndMetadata<String, String>, String>() { + @Override + public String call(MessageAndMetadata<String, String> msgAndMd) { + return msgAndMd.message(); + } + } + ); + + JavaRDD<String> rdd3 = KafkaUtils.createRDD( + sc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + String.class, + kafkaParams, + offsetRanges, + leaders, + new Function<MessageAndMetadata<String, String>, String>() { + @Override + public String call(MessageAndMetadata<String, String> msgAndMd) { + return msgAndMd.message(); + } + } + ); + + // just making sure the java user apis work; the scala tests handle logic corner cases + long count1 = rdd1.count(); + long count2 = rdd2.count(); + long count3 = rdd3.count(); + Assert.assertTrue(count1 > 0); + Assert.assertEquals(count1, count2); + Assert.assertEquals(count1, count3); + } + + private String[] createTopicAndSendData(String topic) { + String[] data = { topic + "-1", topic + "-2", topic + "-3"}; + kafkaTestUtils.createTopic(topic, 1); + kafkaTestUtils.sendMessages(topic, data); + return data; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java new file mode 100644 index 0000000..868df64 --- /dev/null +++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.io.Serializable; +import java.util.*; + +import scala.Tuple2; + +import kafka.serializer.StringDecoder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +public class JavaKafkaStreamSuite implements Serializable { + private transient JavaStreamingContext ssc = null; + private transient Random random = new Random(); + private transient KafkaTestUtils kafkaTestUtils = null; + + @Before + public void setUp() { + kafkaTestUtils = new KafkaTestUtils(); + kafkaTestUtils.setup(); + SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); + ssc = new JavaStreamingContext(sparkConf, new Duration(500)); + } + + @After + public void tearDown() { + if (ssc != null) { + ssc.stop(); + ssc = null; + } + + if (kafkaTestUtils != null) { + kafkaTestUtils.teardown(); + kafkaTestUtils = null; + } + } + + @Test + public void testKafkaStream() throws InterruptedException { + String topic = "topic1"; + Map<String, Integer> topics = new HashMap<>(); + topics.put(topic, 1); + + Map<String, Integer> sent = new HashMap<>(); + sent.put("a", 5); + sent.put("b", 3); + sent.put("c", 10); + + kafkaTestUtils.createTopic(topic, 1); + kafkaTestUtils.sendMessages(topic, sent); + + Map<String, String> kafkaParams = new HashMap<>(); + kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress()); + kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000)); + kafkaParams.put("auto.offset.reset", "smallest"); + + JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topics, + StorageLevel.MEMORY_ONLY_SER()); + + final Map<String, Long> result = Collections.synchronizedMap(new HashMap<String, Long>()); + + JavaDStream<String> words = stream.map( + new Function<Tuple2<String, String>, String>() { + @Override + public String call(Tuple2<String, String> tuple2) { + return tuple2._2(); + } + } + ); + + words.countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() { + @Override + public void call(JavaPairRDD<String, Long> rdd) { + List<Tuple2<String, Long>> ret = rdd.collect(); + for (Tuple2<String, Long> r : ret) { + if (result.containsKey(r._1())) { + result.put(r._1(), result.get(r._1()) + r._2()); + } else { + result.put(r._1(), r._2()); + } + } + } + } + ); + + ssc.start(); + + long startTime = System.currentTimeMillis(); + boolean sizeMatches = false; + while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) { + sizeMatches = sent.size() == result.size(); + Thread.sleep(200); + } + Assert.assertEquals(sent.size(), result.size()); + for (Map.Entry<String, Integer> e : sent.entrySet()) { + Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/test/resources/log4j.properties b/external/kafka-0-8/src/test/resources/log4j.properties new file mode 100644 index 0000000..fd51f8f --- /dev/null +++ b/external/kafka-0-8/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark_project.jetty=WARN + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
