Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e3703c411 -> 56e1e2f17


http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
deleted file mode 100644
index c41b629..0000000
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-import scala.Tuple2;
-
-import kafka.common.TopicAndPartition;
-import kafka.message.MessageAndMetadata;
-import kafka.serializer.StringDecoder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-public class JavaKafkaRDDSuite implements Serializable {
-  private transient JavaSparkContext sc = null;
-  private transient KafkaTestUtils kafkaTestUtils = null;
-
-  @Before
-  public void setUp() {
-    kafkaTestUtils = new KafkaTestUtils();
-    kafkaTestUtils.setup();
-    SparkConf sparkConf = new SparkConf()
-      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
-    sc = new JavaSparkContext(sparkConf);
-  }
-
-  @After
-  public void tearDown() {
-    if (sc != null) {
-      sc.stop();
-      sc = null;
-    }
-
-    if (kafkaTestUtils != null) {
-      kafkaTestUtils.teardown();
-      kafkaTestUtils = null;
-    }
-  }
-
-  @Test
-  public void testKafkaRDD() throws InterruptedException {
-    String topic1 = "topic1";
-    String topic2 = "topic2";
-
-    createTopicAndSendData(topic1);
-    createTopicAndSendData(topic2);
-
-    Map<String, String> kafkaParams = new HashMap<>();
-    kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
-
-    OffsetRange[] offsetRanges = {
-      OffsetRange.create(topic1, 0, 0, 1),
-      OffsetRange.create(topic2, 0, 0, 1)
-    };
-
-    Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>();
-    Map<TopicAndPartition, Broker> leaders = new HashMap<>();
-    String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
-    Broker broker = Broker.create(hostAndPort[0], 
Integer.parseInt(hostAndPort[1]));
-    leaders.put(new TopicAndPartition(topic1, 0), broker);
-    leaders.put(new TopicAndPartition(topic2, 0), broker);
-
-    JavaRDD<String> rdd1 = KafkaUtils.createRDD(
-        sc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        kafkaParams,
-        offsetRanges
-    ).map(
-        new Function<Tuple2<String, String>, String>() {
-          @Override
-          public String call(Tuple2<String, String> kv) {
-            return kv._2();
-          }
-        }
-    );
-
-    JavaRDD<String> rdd2 = KafkaUtils.createRDD(
-        sc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        String.class,
-        kafkaParams,
-        offsetRanges,
-        emptyLeaders,
-        new Function<MessageAndMetadata<String, String>, String>() {
-          @Override
-          public String call(MessageAndMetadata<String, String> msgAndMd) {
-            return msgAndMd.message();
-          }
-        }
-    );
-
-    JavaRDD<String> rdd3 = KafkaUtils.createRDD(
-        sc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        String.class,
-        kafkaParams,
-        offsetRanges,
-        leaders,
-        new Function<MessageAndMetadata<String, String>, String>() {
-          @Override
-          public String call(MessageAndMetadata<String, String> msgAndMd) {
-            return msgAndMd.message();
-          }
-        }
-    );
-
-    // just making sure the java user apis work; the scala tests handle logic 
corner cases
-    long count1 = rdd1.count();
-    long count2 = rdd2.count();
-    long count3 = rdd3.count();
-    Assert.assertTrue(count1 > 0);
-    Assert.assertEquals(count1, count2);
-    Assert.assertEquals(count1, count3);
-  }
-
-  private  String[] createTopicAndSendData(String topic) {
-    String[] data = { topic + "-1", topic + "-2", topic + "-3"};
-    kafkaTestUtils.createTopic(topic, 1);
-    kafkaTestUtils.sendMessages(topic, data);
-    return data;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
deleted file mode 100644
index 868df64..0000000
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka;
-
-import java.io.Serializable;
-import java.util.*;
-
-import scala.Tuple2;
-
-import kafka.serializer.StringDecoder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-public class JavaKafkaStreamSuite implements Serializable {
-  private transient JavaStreamingContext ssc = null;
-  private transient Random random = new Random();
-  private transient KafkaTestUtils kafkaTestUtils = null;
-
-  @Before
-  public void setUp() {
-    kafkaTestUtils = new KafkaTestUtils();
-    kafkaTestUtils.setup();
-    SparkConf sparkConf = new SparkConf()
-      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
-    ssc = new JavaStreamingContext(sparkConf, new Duration(500));
-  }
-
-  @After
-  public void tearDown() {
-    if (ssc != null) {
-      ssc.stop();
-      ssc = null;
-    }
-
-    if (kafkaTestUtils != null) {
-      kafkaTestUtils.teardown();
-      kafkaTestUtils = null;
-    }
-  }
-
-  @Test
-  public void testKafkaStream() throws InterruptedException {
-    String topic = "topic1";
-    Map<String, Integer> topics = new HashMap<>();
-    topics.put(topic, 1);
-
-    Map<String, Integer> sent = new HashMap<>();
-    sent.put("a", 5);
-    sent.put("b", 3);
-    sent.put("c", 10);
-
-    kafkaTestUtils.createTopic(topic, 1);
-    kafkaTestUtils.sendMessages(topic, sent);
-
-    Map<String, String> kafkaParams = new HashMap<>();
-    kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress());
-    kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
-    kafkaParams.put("auto.offset.reset", "smallest");
-
-    JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
-      String.class,
-      String.class,
-      StringDecoder.class,
-      StringDecoder.class,
-      kafkaParams,
-      topics,
-      StorageLevel.MEMORY_ONLY_SER());
-
-    final Map<String, Long> result = Collections.synchronizedMap(new 
HashMap<String, Long>());
-
-    JavaDStream<String> words = stream.map(
-      new Function<Tuple2<String, String>, String>() {
-        @Override
-        public String call(Tuple2<String, String> tuple2) {
-          return tuple2._2();
-        }
-      }
-    );
-
-    words.countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, 
Long>>() {
-        @Override
-        public void call(JavaPairRDD<String, Long> rdd) {
-          List<Tuple2<String, Long>> ret = rdd.collect();
-          for (Tuple2<String, Long> r : ret) {
-            if (result.containsKey(r._1())) {
-              result.put(r._1(), result.get(r._1()) + r._2());
-            } else {
-              result.put(r._1(), r._2());
-            }
-          }
-        }
-      }
-    );
-
-    ssc.start();
-
-    long startTime = System.currentTimeMillis();
-    boolean sizeMatches = false;
-    while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
-      sizeMatches = sent.size() == result.size();
-      Thread.sleep(200);
-    }
-    Assert.assertEquals(sent.size(), result.size());
-    for (Map.Entry<String, Integer> e : sent.entrySet()) {
-      Assert.assertEquals(e.getValue().intValue(), 
result.get(e.getKey()).intValue());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/resources/log4j.properties 
b/external/kafka/src/test/resources/log4j.properties
deleted file mode 100644
index fd51f8f..0000000
--- a/external/kafka/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p 
%c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark_project.jetty=WARN
-

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
deleted file mode 100644
index cb782d2..0000000
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ /dev/null
@@ -1,531 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.File
-import java.util.Arrays
-import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.StringDecoder
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.DStream
-import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
-import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.scheduler.rate.RateEstimator
-import org.apache.spark.util.Utils
-
-class DirectKafkaStreamSuite
-  extends SparkFunSuite
-  with BeforeAndAfter
-  with BeforeAndAfterAll
-  with Eventually
-  with Logging {
-  val sparkConf = new SparkConf()
-    .setMaster("local[4]")
-    .setAppName(this.getClass.getSimpleName)
-
-  private var sc: SparkContext = _
-  private var ssc: StreamingContext = _
-  private var testDir: File = _
-
-  private var kafkaTestUtils: KafkaTestUtils = _
-
-  override def beforeAll {
-    kafkaTestUtils = new KafkaTestUtils
-    kafkaTestUtils.setup()
-  }
-
-  override def afterAll {
-    if (kafkaTestUtils != null) {
-      kafkaTestUtils.teardown()
-      kafkaTestUtils = null
-    }
-  }
-
-  after {
-    if (ssc != null) {
-      ssc.stop()
-      sc = null
-    }
-    if (sc != null) {
-      sc.stop()
-    }
-    if (testDir != null) {
-      Utils.deleteRecursively(testDir)
-    }
-  }
-
-
-  test("basic stream receiving with multiple topics and smallest starting 
offset") {
-    val topics = Set("basic1", "basic2", "basic3")
-    val data = Map("a" -> 7, "b" -> 9)
-    topics.foreach { t =>
-      kafkaTestUtils.createTopic(t)
-      kafkaTestUtils.sendMessages(t, data)
-    }
-    val totalSent = data.values.sum * topics.size
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    ssc = new StreamingContext(sparkConf, Milliseconds(200))
-    val stream = withClue("Error creating direct stream") {
-      KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
-        ssc, kafkaParams, topics)
-    }
-
-    val allReceived = new ConcurrentLinkedQueue[(String, String)]()
-
-    // hold a reference to the current offset ranges, so it can be used 
downstream
-    var offsetRanges = Array[OffsetRange]()
-
-    stream.transform { rdd =>
-      // Get the offset ranges in the RDD
-      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-      rdd
-    }.foreachRDD { rdd =>
-      for (o <- offsetRanges) {
-        logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
-      }
-      val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
-      // For each partition, get size of the range in the partition,
-      // and the number of items in the partition
-        val off = offsetRanges(i)
-        val all = iter.toSeq
-        val partSize = all.size
-        val rangeSize = off.untilOffset - off.fromOffset
-        Iterator((partSize, rangeSize))
-      }.collect
-
-      // Verify whether number of elements in each partition
-      // matches with the corresponding offset range
-      collected.foreach { case (partSize, rangeSize) =>
-        assert(partSize === rangeSize, "offset ranges are wrong")
-      }
-    }
-    stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): 
_*)) }
-    ssc.start()
-    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
-      assert(allReceived.size === totalSent,
-        "didn't get expected number of messages, messages:\n" +
-          allReceived.asScala.mkString("\n"))
-    }
-    ssc.stop()
-  }
-
-  test("receiving from largest starting offset") {
-    val topic = "largest"
-    val topicPartition = TopicAndPartition(topic, 0)
-    val data = Map("a" -> 10)
-    kafkaTestUtils.createTopic(topic)
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "largest"
-    )
-    val kc = new KafkaCluster(kafkaParams)
-    def getLatestOffset(): Long = {
-      
kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
-    }
-
-    // Send some initial messages before starting context
-    kafkaTestUtils.sendMessages(topic, data)
-    eventually(timeout(10 seconds), interval(20 milliseconds)) {
-      assert(getLatestOffset() > 3)
-    }
-    val offsetBeforeStart = getLatestOffset()
-
-    // Setup context and kafka stream with largest offset
-    ssc = new StreamingContext(sparkConf, Milliseconds(200))
-    val stream = withClue("Error creating direct stream") {
-      KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
-        ssc, kafkaParams, Set(topic))
-    }
-    assert(
-      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
-        .fromOffsets(topicPartition) >= offsetBeforeStart,
-      "Start offset not from latest"
-    )
-
-    val collectedData = new ConcurrentLinkedQueue[String]()
-    stream.map { _._2 }.foreachRDD { rdd => 
collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
-    ssc.start()
-    val newData = Map("b" -> 10)
-    kafkaTestUtils.sendMessages(topic, newData)
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      collectedData.contains("b")
-    }
-    assert(!collectedData.contains("a"))
-  }
-
-
-  test("creating stream by offset") {
-    val topic = "offset"
-    val topicPartition = TopicAndPartition(topic, 0)
-    val data = Map("a" -> 10)
-    kafkaTestUtils.createTopic(topic)
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "largest"
-    )
-    val kc = new KafkaCluster(kafkaParams)
-    def getLatestOffset(): Long = {
-      
kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
-    }
-
-    // Send some initial messages before starting context
-    kafkaTestUtils.sendMessages(topic, data)
-    eventually(timeout(10 seconds), interval(20 milliseconds)) {
-      assert(getLatestOffset() >= 10)
-    }
-    val offsetBeforeStart = getLatestOffset()
-
-    // Setup context and kafka stream with largest offset
-    ssc = new StreamingContext(sparkConf, Milliseconds(200))
-    val stream = withClue("Error creating direct stream") {
-      KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder, String](
-        ssc, kafkaParams, Map(topicPartition -> 11L),
-        (m: MessageAndMetadata[String, String]) => m.message())
-    }
-    assert(
-      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
-        .fromOffsets(topicPartition) >= offsetBeforeStart,
-      "Start offset not from latest"
-    )
-
-    val collectedData = new ConcurrentLinkedQueue[String]()
-    stream.foreachRDD { rdd => 
collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
-    ssc.start()
-    val newData = Map("b" -> 10)
-    kafkaTestUtils.sendMessages(topic, newData)
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      collectedData.contains("b")
-    }
-    assert(!collectedData.contains("a"))
-  }
-
-  // Test to verify the offset ranges can be recovered from the checkpoints
-  test("offset recovery") {
-    val topic = "recovery"
-    kafkaTestUtils.createTopic(topic)
-    testDir = Utils.createTempDir()
-
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    // Send data to Kafka and wait for it to be received
-    def sendDataAndWaitForReceive(data: Seq[Int]) {
-      val strings = data.map { _.toString}
-      kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains 
})
-      }
-    }
-
-    // Setup the streaming context
-    ssc = new StreamingContext(sparkConf, Milliseconds(100))
-    val kafkaStream = withClue("Error creating direct stream") {
-      KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
-        ssc, kafkaParams, Set(topic))
-    }
-    val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt }
-    val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: 
Option[Int]) =>
-      Some(values.sum + state.getOrElse(0))
-    }
-    ssc.checkpoint(testDir.getAbsolutePath)
-
-    // This is to collect the raw data received from Kafka
-    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
-      val data = rdd.map { _._2 }.collect()
-      DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
-    }
-
-    // This is ensure all the data is eventually receiving only once
-    stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
-      rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = 
x._2 }
-    }
-    ssc.start()
-
-    // Send some data and wait for them to be received
-    for (i <- (1 to 10).grouped(4)) {
-      sendDataAndWaitForReceive(i)
-    }
-
-    ssc.stop()
-
-    // Verify that offset ranges were generated
-    // Since "offsetRangesAfterStop" will be used to compare with 
"recoveredOffsetRanges", we should
-    // collect offset ranges after stopping. Otherwise, because new RDDs keep 
being generated before
-    // stopping, we may not be able to get the latest RDDs, then 
"recoveredOffsetRanges" will
-    // contain something not in "offsetRangesAfterStop".
-    val offsetRangesAfterStop = getOffsetRanges(kafkaStream)
-    assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated")
-    assert(
-      offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 },
-      "starting offset not zero"
-    )
-
-    logInfo("====== RESTARTING ========")
-
-    // Recover context from checkpoints
-    ssc = new StreamingContext(testDir.getAbsolutePath)
-    val recoveredStream = 
ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
-
-    // Verify offset ranges have been recovered
-    val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
-    assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
-    val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, 
x._2.toSet) }
-    assert(
-      recoveredOffsetRanges.forall { or =>
-        earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
-      },
-      "Recovered ranges are not the same as the ones generated\n" +
-        s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
-        s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
-    )
-    // Restart context, give more data and verify the total at the end
-    // If the total is write that means each records has been received only 
once
-    ssc.start()
-    sendDataAndWaitForReceive(11 to 20)
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
-    }
-    ssc.stop()
-  }
-
-  test("Direct Kafka stream report input information") {
-    val topic = "report-test"
-    val data = Map("a" -> 7, "b" -> 9)
-    kafkaTestUtils.createTopic(topic)
-    kafkaTestUtils.sendMessages(topic, data)
-
-    val totalSent = data.values.sum
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    import DirectKafkaStreamSuite._
-    ssc = new StreamingContext(sparkConf, Milliseconds(200))
-    val collector = new InputInfoCollector
-    ssc.addStreamingListener(collector)
-
-    val stream = withClue("Error creating direct stream") {
-      KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
-        ssc, kafkaParams, Set(topic))
-    }
-
-    val allReceived = new ConcurrentLinkedQueue[(String, String)]
-
-    stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): 
_*)) }
-    ssc.start()
-    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
-      assert(allReceived.size === totalSent,
-        "didn't get expected number of messages, messages:\n" +
-          allReceived.asScala.mkString("\n"))
-
-      // Calculate all the record number collected in the StreamingListener.
-      assert(collector.numRecordsSubmitted.get() === totalSent)
-      assert(collector.numRecordsStarted.get() === totalSent)
-      assert(collector.numRecordsCompleted.get() === totalSent)
-    }
-    ssc.stop()
-  }
-
-  test("maxMessagesPerPartition with backpressure disabled") {
-    val topic = "maxMessagesPerPartition"
-    val kafkaStream = getDirectKafkaStream(topic, None)
-
-    val input = Map(TopicAndPartition(topic, 0) -> 50L, 
TopicAndPartition(topic, 1) -> 50L)
-    assert(kafkaStream.maxMessagesPerPartition(input).get ==
-      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 
10L))
-  }
-
-  test("maxMessagesPerPartition with no lag") {
-    val topic = "maxMessagesPerPartition"
-    val rateController = Some(new ConstantRateController(0, new 
ConstantEstimator(100), 100))
-    val kafkaStream = getDirectKafkaStream(topic, rateController)
-
-    val input = Map(TopicAndPartition(topic, 0) -> 0L, 
TopicAndPartition(topic, 1) -> 0L)
-    assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
-  }
-
-  test("maxMessagesPerPartition respects max rate") {
-    val topic = "maxMessagesPerPartition"
-    val rateController = Some(new ConstantRateController(0, new 
ConstantEstimator(100), 1000))
-    val kafkaStream = getDirectKafkaStream(topic, rateController)
-
-    val input = Map(TopicAndPartition(topic, 0) -> 1000L, 
TopicAndPartition(topic, 1) -> 1000L)
-    assert(kafkaStream.maxMessagesPerPartition(input).get ==
-      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 
10L))
-  }
-
-  test("using rate controller") {
-    val topic = "backpressure"
-    val topicPartitions = Set(TopicAndPartition(topic, 0), 
TopicAndPartition(topic, 1))
-    kafkaTestUtils.createTopic(topic, 2)
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    val batchIntervalMilliseconds = 100
-    val estimator = new ConstantEstimator(100)
-    val messages = Map("foo" -> 200)
-    kafkaTestUtils.sendMessages(topic, messages)
-
-    val sparkConf = new SparkConf()
-      // Safe, even with streaming, because we're using the direct API.
-      // Using 1 core is useful to make the test more predictable.
-      .setMaster("local[1]")
-      .setAppName(this.getClass.getSimpleName)
-      .set("spark.streaming.kafka.maxRatePerPartition", "100")
-
-    // Setup the streaming context
-    ssc = new StreamingContext(sparkConf, 
Milliseconds(batchIntervalMilliseconds))
-
-    val kafkaStream = withClue("Error creating direct stream") {
-      val kc = new KafkaCluster(kafkaParams)
-      val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
-      val m = kc.getEarliestLeaderOffsets(topicPartitions)
-        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => 
lo.offset))
-
-      new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
-          ssc, kafkaParams, m, messageHandler) {
-        override protected[streaming] val rateController =
-          Some(new DirectKafkaRateController(id, estimator))
-      }
-    }
-
-    val collectedData = new ConcurrentLinkedQueue[Array[String]]()
-
-    // Used for assertion failure messages.
-    def dataToString: String =
-      collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", 
"}")
-
-    // This is to collect the raw data received from Kafka
-    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
-      val data = rdd.map { _._2 }.collect()
-      collectedData.add(data)
-    }
-
-    ssc.start()
-
-    // Try different rate limits.
-    // Wait for arrays of data to appear matching the rate.
-    Seq(100, 50, 20).foreach { rate =>
-      collectedData.clear()       // Empty this buffer on each pass.
-      estimator.updateRate(rate)  // Set a new rate.
-      // Expect blocks of data equal to "rate", scaled by the interval length 
in secs.
-      val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
-      eventually(timeout(5.seconds), 
interval(batchIntervalMilliseconds.milliseconds)) {
-        // Assert that rate estimator values are used to determine 
maxMessagesPerPartition.
-        // Funky "-" in message makes the complete assertion message read 
better.
-        assert(collectedData.asScala.exists(_.size == expectedSize),
-          s" - No arrays of size $expectedSize for rate $rate found in 
$dataToString")
-      }
-    }
-
-    ssc.stop()
-  }
-
-  /** Get the generated offset ranges from the DirectKafkaStream */
-  private def getOffsetRanges[K, V](
-      kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
-    kafkaStream.generatedRDDs.mapValues { rdd =>
-      rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
-    }.toSeq.sortBy { _._1 }
-  }
-
-  private def getDirectKafkaStream(topic: String, mockRateController: 
Option[RateController]) = {
-    val batchIntervalMilliseconds = 100
-
-    val sparkConf = new SparkConf()
-      .setMaster("local[1]")
-      .setAppName(this.getClass.getSimpleName)
-      .set("spark.streaming.kafka.maxRatePerPartition", "100")
-
-    // Setup the streaming context
-    ssc = new StreamingContext(sparkConf, 
Milliseconds(batchIntervalMilliseconds))
-
-    val earliestOffsets = Map(TopicAndPartition(topic, 0) -> 0L, 
TopicAndPartition(topic, 1) -> 0L)
-    val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
-    new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, 
(String, String)](
-      ssc, Map[String, String](), earliestOffsets, messageHandler) {
-      override protected[streaming] val rateController = mockRateController
-    }
-  }
-}
-
-object DirectKafkaStreamSuite {
-  val collectedData = new ConcurrentLinkedQueue[String]()
-  @volatile var total = -1L
-
-  class InputInfoCollector extends StreamingListener {
-    val numRecordsSubmitted = new AtomicLong(0L)
-    val numRecordsStarted = new AtomicLong(0L)
-    val numRecordsCompleted = new AtomicLong(0L)
-
-    override def onBatchSubmitted(batchSubmitted: 
StreamingListenerBatchSubmitted): Unit = {
-      numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords)
-    }
-
-    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): 
Unit = {
-      numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords)
-    }
-
-    override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit = {
-      numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords)
-    }
-  }
-}
-
-private[streaming] class ConstantEstimator(@volatile private var rate: Long)
-  extends RateEstimator {
-
-  def updateRate(newRate: Long): Unit = {
-    rate = newRate
-  }
-
-  def compute(
-      time: Long,
-      elements: Long,
-      processingDelay: Long,
-      schedulingDelay: Long): Option[Double] = Some(rate)
-}
-
-private[streaming] class ConstantRateController(id: Int, estimator: 
RateEstimator, rate: Long)
-  extends RateController(id, estimator) {
-  override def publish(rate: Long): Unit = ()
-  override def getLatestRate(): Long = rate
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
deleted file mode 100644
index d66830c..0000000
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.util.Random
-
-import kafka.common.TopicAndPartition
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark.SparkFunSuite
-
-class KafkaClusterSuite extends SparkFunSuite with BeforeAndAfterAll {
-  private val topic = "kcsuitetopic" + Random.nextInt(10000)
-  private val topicAndPartition = TopicAndPartition(topic, 0)
-  private var kc: KafkaCluster = null
-
-  private var kafkaTestUtils: KafkaTestUtils = _
-
-  override def beforeAll() {
-    kafkaTestUtils = new KafkaTestUtils
-    kafkaTestUtils.setup()
-
-    kafkaTestUtils.createTopic(topic)
-    kafkaTestUtils.sendMessages(topic, Map("a" -> 1))
-    kc = new KafkaCluster(Map("metadata.broker.list" -> 
kafkaTestUtils.brokerAddress))
-  }
-
-  override def afterAll() {
-    if (kafkaTestUtils != null) {
-      kafkaTestUtils.teardown()
-      kafkaTestUtils = null
-    }
-  }
-
-  test("metadata apis") {
-    val leader = 
kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition)
-    val leaderAddress = s"${leader._1}:${leader._2}"
-    assert(leaderAddress === kafkaTestUtils.brokerAddress, "didn't get leader")
-
-    val parts = kc.getPartitions(Set(topic)).right.get
-    assert(parts(topicAndPartition), "didn't get partitions")
-
-    val err = kc.getPartitions(Set(topic + "BAD"))
-    assert(err.isLeft, "getPartitions for a nonexistant topic should be an 
error")
-  }
-
-  test("leader offset apis") {
-    val earliest = 
kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get
-    assert(earliest(topicAndPartition).offset === 0, "didn't get earliest")
-
-    val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get
-    assert(latest(topicAndPartition).offset === 1, "didn't get latest")
-  }
-
-  test("consumer offset apis") {
-    val group = "kcsuitegroup" + Random.nextInt(10000)
-
-    val offset = Random.nextInt(10000)
-
-    val set = kc.setConsumerOffsets(group, Map(topicAndPartition -> offset))
-    assert(set.isRight, "didn't set consumer offsets")
-
-    val get = kc.getConsumerOffsets(group, Set(topicAndPartition)).right.get
-    assert(get(topicAndPartition) === offset, "didn't get consumer offsets")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
deleted file mode 100644
index 5e539c1..0000000
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.util.Random
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.StringDecoder
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark._
-
-class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
-
-  private var kafkaTestUtils: KafkaTestUtils = _
-
-  private val sparkConf = new SparkConf().setMaster("local[4]")
-    .setAppName(this.getClass.getSimpleName)
-  private var sc: SparkContext = _
-
-  override def beforeAll {
-    sc = new SparkContext(sparkConf)
-    kafkaTestUtils = new KafkaTestUtils
-    kafkaTestUtils.setup()
-  }
-
-  override def afterAll {
-    if (sc != null) {
-      sc.stop
-      sc = null
-    }
-
-    if (kafkaTestUtils != null) {
-      kafkaTestUtils.teardown()
-      kafkaTestUtils = null
-    }
-  }
-
-  test("basic usage") {
-    val topic = s"topicbasic-${Random.nextInt}"
-    kafkaTestUtils.createTopic(topic)
-    val messages = Array("the", "quick", "brown", "fox")
-    kafkaTestUtils.sendMessages(topic, messages)
-
-    val kafkaParams = Map("metadata.broker.list" -> 
kafkaTestUtils.brokerAddress,
-      "group.id" -> s"test-consumer-${Random.nextInt}")
-
-    val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
-
-    val rdd = KafkaUtils.createRDD[String, String, StringDecoder, 
StringDecoder](
-      sc, kafkaParams, offsetRanges)
-
-    val received = rdd.map(_._2).collect.toSet
-    assert(received === messages.toSet)
-
-    // size-related method optimizations return sane results
-    assert(rdd.count === messages.size)
-    assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
-    assert(!rdd.isEmpty)
-    assert(rdd.take(1).size === 1)
-    assert(rdd.take(1).head._2 === messages.head)
-    assert(rdd.take(messages.size + 10).size === messages.size)
-
-    val emptyRdd = KafkaUtils.createRDD[String, String, StringDecoder, 
StringDecoder](
-      sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)))
-
-    assert(emptyRdd.isEmpty)
-
-    // invalid offset ranges throw exceptions
-    val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
-    intercept[SparkException] {
-      KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
-        sc, kafkaParams, badRanges)
-    }
-  }
-
-  test("iterator boundary conditions") {
-    // the idea is to find e.g. off-by-one errors between what kafka has 
available and the rdd
-    val topic = s"topicboundary-${Random.nextInt}"
-    val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
-    kafkaTestUtils.createTopic(topic)
-
-    val kafkaParams = Map("metadata.broker.list" -> 
kafkaTestUtils.brokerAddress,
-      "group.id" -> s"test-consumer-${Random.nextInt}")
-
-    val kc = new KafkaCluster(kafkaParams)
-
-    // this is the "lots of messages" case
-    kafkaTestUtils.sendMessages(topic, sent)
-    val sentCount = sent.values.sum
-
-    // rdd defined from leaders after sending messages, should get the number 
sent
-    val rdd = getRdd(kc, Set(topic))
-
-    assert(rdd.isDefined)
-
-    val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
-    val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
-
-    assert(rangeCount === sentCount, "offset range didn't include all sent 
messages")
-    assert(rdd.get.count === sentCount, "didn't get all sent messages")
-
-    val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> 
o.untilOffset).toMap
-
-    // make sure consumer offsets are committed before the next getRdd call
-    kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
-      err => throw new Exception(err.mkString("\n")),
-      _ => ()
-    )
-
-    // this is the "0 messages" case
-    val rdd2 = getRdd(kc, Set(topic))
-    // shouldn't get anything, since message is sent after rdd was defined
-    val sentOnlyOne = Map("d" -> 1)
-
-    kafkaTestUtils.sendMessages(topic, sentOnlyOne)
-
-    assert(rdd2.isDefined)
-    assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
-
-    // this is the "exactly 1 message" case, namely the single message from 
sentOnlyOne above
-    val rdd3 = getRdd(kc, Set(topic))
-    // send lots of messages after rdd was defined, they shouldn't show up
-    kafkaTestUtils.sendMessages(topic, Map("extra" -> 22))
-
-    assert(rdd3.isDefined)
-    assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one 
message")
-
-  }
-
-  // get an rdd from the committed consumer offsets until the latest leader 
offsets,
-  private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
-    val groupId = kc.kafkaParams("group.id")
-    def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = {
-      kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
-        kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs 
=>
-          offs.map(kv => kv._1 -> kv._2.offset)
-        }
-      )
-    }
-    kc.getPartitions(topics).right.toOption.flatMap { topicPartitions =>
-      consumerOffsets(topicPartitions).flatMap { from =>
-        kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until 
=>
-          val offsetRanges = from.map { case (tp: TopicAndPartition, 
fromOffset: Long) =>
-              OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset)
-          }.toArray
-
-          val leaders = until.map { case (tp: TopicAndPartition, lo: 
KafkaCluster.LeaderOffset) =>
-              tp -> Broker(lo.host, lo.port)
-          }.toMap
-
-          KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, 
String](
-            sc, kc.kafkaParams, offsetRanges, leaders,
-            (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} 
${mmd.message}")
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
deleted file mode 100644
index 6a35ac1..0000000
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.collection.mutable
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Random
-
-import kafka.serializer.StringDecoder
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-
-class KafkaStreamSuite extends SparkFunSuite with Eventually with 
BeforeAndAfterAll {
-  private var ssc: StreamingContext = _
-  private var kafkaTestUtils: KafkaTestUtils = _
-
-  override def beforeAll(): Unit = {
-    kafkaTestUtils = new KafkaTestUtils
-    kafkaTestUtils.setup()
-  }
-
-  override def afterAll(): Unit = {
-    if (ssc != null) {
-      ssc.stop()
-      ssc = null
-    }
-
-    if (kafkaTestUtils != null) {
-      kafkaTestUtils.teardown()
-      kafkaTestUtils = null
-    }
-  }
-
-  test("Kafka input stream") {
-    val sparkConf = new 
SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
-    ssc = new StreamingContext(sparkConf, Milliseconds(500))
-    val topic = "topic1"
-    val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
-    kafkaTestUtils.createTopic(topic)
-    kafkaTestUtils.sendMessages(topic, sent)
-
-    val kafkaParams = Map("zookeeper.connect" -> kafkaTestUtils.zkAddress,
-      "group.id" -> s"test-consumer-${Random.nextInt(10000)}",
-      "auto.offset.reset" -> "smallest")
-
-    val stream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](
-      ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
-    val result = new mutable.HashMap[String, Long]()
-    stream.map(_._2).countByValue().foreachRDD { r =>
-      r.collect().foreach { kv =>
-        result.synchronized {
-          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
-          result.put(kv._1, count)
-        }
-      }
-    }
-
-    ssc.start()
-
-    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
-      assert(result.synchronized { sent === result })
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
deleted file mode 100644
index 7b9aee3..0000000
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.File
-
-import scala.collection.mutable
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Random
-
-import kafka.serializer.StringDecoder
-import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import org.apache.spark.util.Utils
-
-class ReliableKafkaStreamSuite extends SparkFunSuite
-    with BeforeAndAfterAll with BeforeAndAfter with Eventually {
-
-  private val sparkConf = new SparkConf()
-    .setMaster("local[4]")
-    .setAppName(this.getClass.getSimpleName)
-    .set("spark.streaming.receiver.writeAheadLog.enable", "true")
-  private val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
-
-  private var kafkaTestUtils: KafkaTestUtils = _
-
-  private var groupId: String = _
-  private var kafkaParams: Map[String, String] = _
-  private var ssc: StreamingContext = _
-  private var tempDirectory: File = null
-
-  override def beforeAll(): Unit = {
-    kafkaTestUtils = new KafkaTestUtils
-    kafkaTestUtils.setup()
-
-    groupId = s"test-consumer-${Random.nextInt(10000)}"
-    kafkaParams = Map(
-      "zookeeper.connect" -> kafkaTestUtils.zkAddress,
-      "group.id" -> groupId,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    tempDirectory = Utils.createTempDir()
-  }
-
-  override def afterAll(): Unit = {
-    Utils.deleteRecursively(tempDirectory)
-
-    if (kafkaTestUtils != null) {
-      kafkaTestUtils.teardown()
-      kafkaTestUtils = null
-    }
-  }
-
-  before {
-    ssc = new StreamingContext(sparkConf, Milliseconds(500))
-    ssc.checkpoint(tempDirectory.getAbsolutePath)
-  }
-
-  after {
-    if (ssc != null) {
-      ssc.stop()
-      ssc = null
-    }
-  }
-
-  test("Reliable Kafka input stream with single topic") {
-    val topic = "test-topic"
-    kafkaTestUtils.createTopic(topic)
-    kafkaTestUtils.sendMessages(topic, data)
-
-    // Verify whether the offset of this group/topic/partition is 0 before 
starting.
-    assert(getCommitOffset(groupId, topic, 0) === None)
-
-    val stream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](
-      ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
-    val result = new mutable.HashMap[String, Long]()
-    stream.map { case (k, v) => v }.foreachRDD { r =>
-        val ret = r.collect()
-        ret.foreach { v =>
-          val count = result.getOrElseUpdate(v, 0) + 1
-          result.put(v, count)
-        }
-      }
-    ssc.start()
-
-    eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
-      // A basic process verification for ReliableKafkaReceiver.
-      // Verify whether received message number is equal to the sent message 
number.
-      assert(data.size === result.size)
-      // Verify whether each message is the same as the data to be verified.
-      data.keys.foreach { k => assert(data(k) === result(k).toInt) }
-      // Verify the offset number whether it is equal to the total message 
number.
-      assert(getCommitOffset(groupId, topic, 0) === Some(29L))
-    }
-  }
-
-  test("Reliable Kafka input stream with multiple topics") {
-    val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
-    topics.foreach { case (t, _) =>
-      kafkaTestUtils.createTopic(t)
-      kafkaTestUtils.sendMessages(t, data)
-    }
-
-    // Before started, verify all the group/topic/partition offsets are 0.
-    topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 
None) }
-
-    // Consuming all the data sent to the broker which will potential commit 
the offsets internally.
-    val stream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](
-      ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
-    stream.foreachRDD(_ => Unit)
-    ssc.start()
-
-    eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
-      // Verify the offset for each group/topic to see whether they are equal 
to the expected one.
-      topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) 
=== Some(29L)) }
-    }
-  }
-
-
-  /** Getting partition offset from Zookeeper. */
-  private def getCommitOffset(groupId: String, topic: String, partition: Int): 
Option[Long] = {
-    val topicDirs = new ZKGroupTopicDirs(groupId, topic)
-    val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
-    ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, 
zkPath)._1.map(_.toLong)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3e783fa..d71913c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,8 +108,8 @@
     <module>examples</module>
     <module>repl</module>
     <module>launcher</module>
-    <module>external/kafka</module>
-    <module>external/kafka-assembly</module>
+    <module>external/kafka-0-8</module>
+    <module>external/kafka-0-8-assembly</module>
   </modules>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/project/MimaBuild.scala
----------------------------------------------------------------------
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index 3dc1cea..2a989dd 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -89,7 +89,15 @@ object MimaBuild {
   def mimaSettings(sparkHome: File, projectRef: ProjectRef) = {
     val organization = "org.apache.spark"
     val previousSparkVersion = "1.6.0"
-    val fullId = "spark-" + projectRef.project + "_2.11"
+    // This check can be removed post-2.0
+    val project = if (previousSparkVersion == "1.6.0" &&
+      projectRef.project == "streaming-kafka-0-8"
+    ) {
+      "streaming-kafka"
+    } else {
+      projectRef.project
+    }
+    val fullId = "spark-" + project + "_2.11"
     mimaDefaultSettings ++
     Seq(previousArtifact := Some(organization % fullId % previousSparkVersion),
       binaryIssueFilters ++= ignoredABIProblems(sparkHome, version.value))

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f50f41a..d83afa0 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -46,7 +46,7 @@ object BuildCommons {
   val streamingProjects@Seq(
     streaming, streamingFlumeSink, streamingFlume, streamingKafka
   ) = Seq(
-    "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka"
+    "streaming", "streaming-flume-sink", "streaming-flume", 
"streaming-kafka-0-8"
   ).map(ProjectRef(buildLocation, _))
 
   val allProjects@Seq(
@@ -62,7 +62,7 @@ object BuildCommons {
       "docker-integration-tests").map(ProjectRef(buildLocation, _))
 
   val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, 
streamingKafkaAssembly, streamingKinesisAslAssembly) =
-    Seq("network-yarn", "streaming-flume-assembly", 
"streaming-kafka-assembly", "streaming-kinesis-asl-assembly")
+    Seq("network-yarn", "streaming-flume-assembly", 
"streaming-kafka-0-8-assembly", "streaming-kinesis-asl-assembly")
       .map(ProjectRef(buildLocation, _))
 
   val copyJarsProjects@Seq(assembly, examples) = Seq("assembly", "examples")
@@ -581,8 +581,8 @@ object Assembly {
         
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
     },
     jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, 
mName, hv) =>
-      if (mName.contains("streaming-flume-assembly") || 
mName.contains("streaming-kafka-assembly") || 
mName.contains("streaming-kinesis-asl-assembly")) {
-        // This must match the same name used in maven (see 
external/kafka-assembly/pom.xml)
+      if (mName.contains("streaming-flume-assembly") || 
mName.contains("streaming-kafka-0-8-assembly") || 
mName.contains("streaming-kinesis-asl-assembly")) {
+        // This must match the same name used in maven (see 
external/kafka-0-8-assembly/pom.xml)
         s"${mName}-${v}.jar"
       } else {
         s"${mName}-${v}-hadoop${hv}.jar"

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py 
b/python/pyspark/streaming/kafka.py
index 02a8869..015ca77 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -208,13 +208,13 @@ 
________________________________________________________________________________
   1. Include the Kafka library and its dependencies with in the
      spark-submit command as
 
-     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s 
...
+     $ bin/spark-submit --packages 
org.apache.spark:spark-streaming-kafka-0-8:%s ...
 
   2. Download the JAR of the artifact from Maven Central 
http://search.maven.org/,
-     Group Id = org.apache.spark, Artifact Id = 
spark-streaming-kafka-assembly, Version = %s.
+     Group Id = org.apache.spark, Artifact Id = 
spark-streaming-kafka-0-8-assembly, Version = %s.
      Then, include the jar in the spark-submit command as
 
-     $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
+     $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...
 
 
________________________________________________________________________________________________
 

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index f27628c..360ba1e 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1476,13 +1476,13 @@ def search_jar(dir, name_prefix):
 
 def search_kafka_assembly_jar():
     SPARK_HOME = os.environ["SPARK_HOME"]
-    kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
-    jars = search_jar(kafka_assembly_dir, "spark-streaming-kafka-assembly")
+    kafka_assembly_dir = os.path.join(SPARK_HOME, 
"external/kafka-0-8-assembly")
+    jars = search_jar(kafka_assembly_dir, "spark-streaming-kafka-0-8-assembly")
     if not jars:
         raise Exception(
             ("Failed to find Spark Streaming kafka assembly jar in %s. " % 
kafka_assembly_dir) +
             "You need to build Spark with "
-            "'build/sbt assembly/package streaming-kafka-assembly/assembly' or 
"
+            "'build/sbt assembly/package 
streaming-kafka-0-8-assembly/assembly' or "
             "'build/mvn package' before running this test.")
     elif len(jars) > 1:
         raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: 
%s; please "


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to