Repository: spark
Updated Branches:
  refs/heads/master 54d13bed8 -> 24587ce43


http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
new file mode 100644
index 0000000..7dc9606
--- /dev/null
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.avro.ipc.NettyServer
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol, 
Status}
+import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, 
Channels}
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
+
+private[streaming]
+class FlumeInputDStream[T: ClassTag](
+  _ssc: StreamingContext,
+  host: String,
+  port: Int,
+  storageLevel: StorageLevel,
+  enableDecompression: Boolean
+) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
+
+  override def getReceiver(): Receiver[SparkFlumeEvent] = {
+    new FlumeReceiver(host, port, storageLevel, enableDecompression)
+  }
+}
+
+/**
+ * A wrapper class for AvroFlumeEvent's with a custom serialization format.
+ *
+ * This is necessary because AvroFlumeEvent uses inner data structures
+ * which are not serializable.
+ */
+class SparkFlumeEvent() extends Externalizable {
+  var event: AvroFlumeEvent = new AvroFlumeEvent()
+
+  /* De-serialize from bytes. */
+  def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
+    val bodyLength = in.readInt()
+    val bodyBuff = new Array[Byte](bodyLength)
+    in.readFully(bodyBuff)
+
+    val numHeaders = in.readInt()
+    val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+    for (i <- 0 until numHeaders) {
+      val keyLength = in.readInt()
+      val keyBuff = new Array[Byte](keyLength)
+      in.readFully(keyBuff)
+      val key: String = Utils.deserialize(keyBuff)
+
+      val valLength = in.readInt()
+      val valBuff = new Array[Byte](valLength)
+      in.readFully(valBuff)
+      val value: String = Utils.deserialize(valBuff)
+
+      headers.put(key, value)
+    }
+
+    event.setBody(ByteBuffer.wrap(bodyBuff))
+    event.setHeaders(headers)
+  }
+
+  /* Serialize to bytes. */
+  def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
+    val body = event.getBody
+    out.writeInt(body.remaining())
+    Utils.writeByteBuffer(body, out)
+
+    val numHeaders = event.getHeaders.size()
+    out.writeInt(numHeaders)
+    for ((k, v) <- event.getHeaders.asScala) {
+      val keyBuff = Utils.serialize(k.toString)
+      out.writeInt(keyBuff.length)
+      out.write(keyBuff)
+      val valBuff = Utils.serialize(v.toString)
+      out.writeInt(valBuff.length)
+      out.write(valBuff)
+    }
+  }
+}
+
+private[streaming] object SparkFlumeEvent {
+  def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
+    val event = new SparkFlumeEvent
+    event.event = in
+    event
+  }
+}
+
+/** A simple server that implements Flume's Avro protocol. */
+private[streaming]
+class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
+  override def append(event: AvroFlumeEvent): Status = {
+    receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
+    Status.OK
+  }
+
+  override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
+    events.asScala.foreach(event => 
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
+    Status.OK
+  }
+}
+
+/** A NetworkReceiver which listens for events using the
+  * Flume Avro interface. */
+private[streaming]
+class FlumeReceiver(
+    host: String,
+    port: Int,
+    storageLevel: StorageLevel,
+    enableDecompression: Boolean
+  ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
+
+  lazy val responder = new SpecificResponder(
+    classOf[AvroSourceProtocol], new FlumeEventServer(this))
+  var server: NettyServer = null
+
+  private def initServer() = {
+    if (enableDecompression) {
+      val channelFactory = new 
NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+                                                             
Executors.newCachedThreadPool())
+      val channelPipelineFactory = new CompressionChannelPipelineFactory()
+
+      new NettyServer(
+        responder,
+        new InetSocketAddress(host, port),
+        channelFactory,
+        channelPipelineFactory,
+        null)
+    } else {
+      new NettyServer(responder, new InetSocketAddress(host, port))
+    }
+  }
+
+  def onStart() {
+    synchronized {
+      if (server == null) {
+        server = initServer()
+        server.start()
+      } else {
+        logWarning("Flume receiver being asked to start more then once with 
out close")
+      }
+    }
+    logInfo("Flume receiver started")
+  }
+
+  def onStop() {
+    synchronized {
+      if (server != null) {
+        server.close()
+        server = null
+      }
+    }
+    logInfo("Flume receiver stopped")
+  }
+
+  override def preferredLocation: Option[String] = Option(host)
+
+  /** A Netty Pipeline factory that will decompress incoming data from
+    * and the Netty client and compress data going back to the client.
+    *
+    * The compression on the return is required because Flume requires
+    * a successful response to indicate it can remove the event/batch
+    * from the configured channel
+    */
+  private[streaming]
+  class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
+    def getPipeline(): ChannelPipeline = {
+      val pipeline = Channels.pipeline()
+      val encoder = new ZlibEncoder(6)
+      pipeline.addFirst("deflater", encoder)
+      pipeline.addFirst("inflater", new ZlibDecoder())
+      pipeline
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
new file mode 100644
index 0000000..250bfc1
--- /dev/null
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.flume.sink._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * A [[ReceiverInputDStream]] that can be used to read data from several Flume 
agents running
+ * [[org.apache.spark.streaming.flume.sink.SparkSink]]s.
+ * @param _ssc Streaming context that will execute this input stream
+ * @param addresses List of addresses at which SparkSinks are listening
+ * @param maxBatchSize Maximum size of a batch
+ * @param parallelism Number of parallel connections to open
+ * @param storageLevel The storage level to use.
+ * @tparam T Class type of the object of this stream
+ */
+private[streaming] class FlumePollingInputDStream[T: ClassTag](
+    _ssc: StreamingContext,
+    val addresses: Seq[InetSocketAddress],
+    val maxBatchSize: Int,
+    val parallelism: Int,
+    storageLevel: StorageLevel
+  ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
+
+  override def getReceiver(): Receiver[SparkFlumeEvent] = {
+    new FlumePollingReceiver(addresses, maxBatchSize, parallelism, 
storageLevel)
+  }
+}
+
+private[streaming] class FlumePollingReceiver(
+    addresses: Seq[InetSocketAddress],
+    maxBatchSize: Int,
+    parallelism: Int,
+    storageLevel: StorageLevel
+  ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
+
+  lazy val channelFactoryExecutor =
+    Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
+      setNameFormat("Flume Receiver Channel Thread - %d").build())
+
+  lazy val channelFactory =
+    new NioClientSocketChannelFactory(channelFactoryExecutor, 
channelFactoryExecutor)
+
+  lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver 
Thread - %d").build())
+
+  private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()
+
+  override def onStart(): Unit = {
+    // Create the connections to each Flume agent.
+    addresses.foreach(host => {
+      val transceiver = new NettyTransceiver(host, channelFactory)
+      val client = 
SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
+      connections.add(new FlumeConnection(transceiver, client))
+    })
+    for (i <- 0 until parallelism) {
+      logInfo("Starting Flume Polling Receiver worker threads..")
+      // Threads that pull data from Flume.
+      receiverExecutor.submit(new FlumeBatchFetcher(this))
+    }
+  }
+
+  override def onStop(): Unit = {
+    logInfo("Shutting down Flume Polling Receiver")
+    receiverExecutor.shutdown()
+    // Wait upto a minute for the threads to die
+    if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+      receiverExecutor.shutdownNow()
+    }
+    connections.asScala.foreach(_.transceiver.close())
+    channelFactory.releaseExternalResources()
+  }
+
+  private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = {
+    this.connections
+  }
+
+  private[flume] def getMaxBatchSize: Int = {
+    this.maxBatchSize
+  }
+}
+
+/**
+ * A wrapper around the transceiver and the Avro IPC API.
+ * @param transceiver The transceiver to use for communication with Flume
+ * @param client The client that the callbacks are received on.
+ */
+private[flume] class FlumeConnection(val transceiver: NettyTransceiver,
+  val client: SparkFlumeProtocol.Callback)
+
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
new file mode 100644
index 0000000..945cfa7
--- /dev/null
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.net.{InetSocketAddress, ServerSocket}
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.util.{List => JList}
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.commons.lang3.RandomUtils
+import org.apache.flume.source.avro
+import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
+
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class FlumeTestUtils {
+
+  private var transceiver: NettyTransceiver = null
+
+  private val testPort: Int = findFreePort()
+
+  def getTestPort(): Int = testPort
+
+  /** Find a free port */
+  private def findFreePort(): Int = {
+    val candidatePort = RandomUtils.nextInt(1024, 65536)
+    Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
+      val socket = new ServerSocket(trialPort)
+      socket.close()
+      (null, trialPort)
+    }, new SparkConf())._2
+  }
+
+  /** Send data to the flume receiver */
+  def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
+    val testAddress = new InetSocketAddress("localhost", testPort)
+
+    val inputEvents = input.asScala.map { item =>
+      val event = new AvroFlumeEvent
+      event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8)))
+      event.setHeaders(Collections.singletonMap("test", "header"))
+      event
+    }
+
+    // if last attempted transceiver had succeeded, close it
+    close()
+
+    // Create transceiver
+    transceiver = {
+      if (enableCompression) {
+        new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
+      } else {
+        new NettyTransceiver(testAddress)
+      }
+    }
+
+    // Create Avro client with the transceiver
+    val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], 
transceiver)
+    if (client == null) {
+      throw new AssertionError("Cannot create client")
+    }
+
+    // Send data
+    val status = client.appendBatch(inputEvents.asJava)
+    if (status != avro.Status.OK) {
+      throw new AssertionError("Sent events unsuccessfully")
+    }
+  }
+
+  def close(): Unit = {
+    if (transceiver != null) {
+      transceiver.close()
+      transceiver = null
+    }
+  }
+
+  /** Class to create socket channel with compression */
+  private class CompressionChannelFactory(compressionLevel: Int)
+    extends NioClientSocketChannelFactory {
+
+    override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
+      val encoder = new ZlibEncoder(compressionLevel)
+      pipeline.addFirst("deflater", encoder)
+      pipeline.addFirst("inflater", new ZlibDecoder())
+      super.newChannel(pipeline)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
new file mode 100644
index 0000000..3e3ed71
--- /dev/null
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{ByteArrayOutputStream, DataOutputStream}
+import java.net.InetSocketAddress
+import java.util.{List => JList, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function.PairFunction
+import org.apache.spark.api.python.PythonRDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{JavaPairDStream, 
JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object FlumeUtils {
+  private val DEFAULT_POLLING_PARALLELISM = 5
+  private val DEFAULT_POLLING_BATCH_SIZE = 1000
+
+  /**
+   * Create a input stream from a Flume source.
+   * @param ssc      StreamingContext object
+   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
+   * @param port     Port of the slave machine to which the flume data will be 
sent
+   * @param storageLevel  Storage level to use for storing the received objects
+   */
+  def createStream (
+      ssc: StreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+    ): ReceiverInputDStream[SparkFlumeEvent] = {
+    createStream(ssc, hostname, port, storageLevel, false)
+  }
+
+  /**
+   * Create a input stream from a Flume source.
+   * @param ssc      StreamingContext object
+   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
+   * @param port     Port of the slave machine to which the flume data will be 
sent
+   * @param storageLevel  Storage level to use for storing the received objects
+   * @param enableDecompression  should netty server decompress input stream
+   */
+  def createStream (
+      ssc: StreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel,
+      enableDecompression: Boolean
+    ): ReceiverInputDStream[SparkFlumeEvent] = {
+    val inputStream = new FlumeInputDStream[SparkFlumeEvent](
+        ssc, hostname, port, storageLevel, enableDecompression)
+
+    inputStream
+  }
+
+  /**
+   * Creates a input stream from a Flume source.
+   * Storage level of the data will be the default 
StorageLevel.MEMORY_AND_DISK_SER_2.
+   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
+   * @param port     Port of the slave machine to which the flume data will be 
sent
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      hostname: String,
+      port: Int
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createStream(jssc.ssc, hostname, port)
+  }
+
+  /**
+   * Creates a input stream from a Flume source.
+   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
+   * @param port     Port of the slave machine to which the flume data will be 
sent
+   * @param storageLevel  Storage level to use for storing the received objects
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createStream(jssc.ssc, hostname, port, storageLevel, false)
+  }
+
+  /**
+   * Creates a input stream from a Flume source.
+   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
+   * @param port     Port of the slave machine to which the flume data will be 
sent
+   * @param storageLevel  Storage level to use for storing the received objects
+   * @param enableDecompression  should netty server decompress input stream
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel,
+      enableDecompression: Boolean
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are 
available.
+   * This stream will use a batch size of 1000 events and run 5 threads to 
pull data.
+   * @param hostname Address of the host on which the Spark Sink is running
+   * @param port Port of the host at which the Spark Sink is listening
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def createPollingStream(
+      ssc: StreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+    ): ReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), 
storageLevel)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are 
available.
+   * This stream will use a batch size of 1000 events and run 5 threads to 
pull data.
+   * @param addresses List of InetSocketAddresses representing the hosts to 
connect to.
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def createPollingStream(
+      ssc: StreamingContext,
+      addresses: Seq[InetSocketAddress],
+      storageLevel: StorageLevel
+    ): ReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(ssc, addresses, storageLevel,
+      DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are 
available.
+   * @param addresses List of InetSocketAddresses representing the hosts to 
connect to.
+   * @param maxBatchSize Maximum number of events to be pulled from the Spark 
sink in a
+   *                     single RPC call
+   * @param parallelism Number of concurrent requests this stream should send 
to the sink. Note
+   *                    that having a higher number of requests concurrently 
being pulled will
+   *                    result in this stream using more threads
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def createPollingStream(
+      ssc: StreamingContext,
+      addresses: Seq[InetSocketAddress],
+      storageLevel: StorageLevel,
+      maxBatchSize: Int,
+      parallelism: Int
+    ): ReceiverInputDStream[SparkFlumeEvent] = {
+    new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
+      parallelism, storageLevel)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are 
available.
+   * This stream will use a batch size of 1000 events and run 5 threads to 
pull data.
+   * @param hostname Hostname of the host on which the Spark Sink is running
+   * @param port     Port of the host at which the Spark Sink is listening
+   */
+  def createPollingStream(
+      jssc: JavaStreamingContext,
+      hostname: String,
+      port: Int
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(jssc, hostname, port, 
StorageLevel.MEMORY_AND_DISK_SER_2)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are 
available.
+   * This stream will use a batch size of 1000 events and run 5 threads to 
pull data.
+   * @param hostname     Hostname of the host on which the Spark Sink is 
running
+   * @param port         Port of the host at which the Spark Sink is listening
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def createPollingStream(
+      jssc: JavaStreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), 
storageLevel)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are 
available.
+   * This stream will use a batch size of 1000 events and run 5 threads to 
pull data.
+   * @param addresses    List of InetSocketAddresses on which the Spark Sink 
is running.
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def createPollingStream(
+      jssc: JavaStreamingContext,
+      addresses: Array[InetSocketAddress],
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(jssc, addresses, storageLevel,
+      DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are 
available.
+   * @param addresses    List of InetSocketAddresses on which the Spark Sink 
is running
+   * @param maxBatchSize The maximum number of events to be pulled from the 
Spark sink in a
+   *                     single RPC call
+   * @param parallelism  Number of concurrent requests this stream should send 
to the sink. Note
+   *                     that having a higher number of requests concurrently 
being pulled will
+   *                     result in this stream using more threads
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def createPollingStream(
+      jssc: JavaStreamingContext,
+      addresses: Array[InetSocketAddress],
+      storageLevel: StorageLevel,
+      maxBatchSize: Int,
+      parallelism: Int
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, 
parallelism)
+  }
+}
+
+/**
+ * This is a helper class that wraps the methods in FlumeUtils into more 
Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's 
FlumeUtils.
+ */
+private[flume] class FlumeUtilsPythonHelper {
+
+  def createStream(
+      jssc: JavaStreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel,
+      enableDecompression: Boolean
+    ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+    val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, 
enableDecompression)
+    FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+  }
+
+  def createPollingStream(
+      jssc: JavaStreamingContext,
+      hosts: JList[String],
+      ports: JList[Int],
+      storageLevel: StorageLevel,
+      maxBatchSize: Int,
+      parallelism: Int
+    ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+    assert(hosts.size() == ports.size())
+    val addresses = hosts.asScala.zip(ports.asScala).map {
+      case (host, port) => new InetSocketAddress(host, port)
+    }
+    val dstream = FlumeUtils.createPollingStream(
+      jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+    FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+  }
+
+}
+
+private object FlumeUtilsPythonHelper {
+
+  private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): 
Array[Byte] = {
+    val byteStream = new ByteArrayOutputStream()
+    val output = new DataOutputStream(byteStream)
+    try {
+      output.writeInt(map.size)
+      map.asScala.foreach { kv =>
+        PythonRDD.writeUTF(kv._1.toString, output)
+        PythonRDD.writeUTF(kv._2.toString, output)
+      }
+      byteStream.toByteArray
+    }
+    finally {
+      output.close()
+    }
+  }
+
+  private def toByteArrayPairDStream(dstream: 
JavaReceiverInputDStream[SparkFlumeEvent]):
+    JavaPairDStream[Array[Byte], Array[Byte]] = {
+    dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], 
Array[Byte]] {
+      override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], 
Array[Byte]) = {
+        val event = sparkEvent.event
+        val byteBuffer = event.getBody
+        val body = new Array[Byte](byteBuffer.remaining())
+        byteBuffer.get(body)
+        (stringMapToByteArray(event.getHeaders), body)
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
new file mode 100644
index 0000000..1a96df6
--- /dev/null
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.nio.charset.StandardCharsets
+import java.util.{Collections, List => JList, Map => JMap}
+import java.util.concurrent._
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.flume.event.EventBuilder
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.conf.Configurables
+
+import org.apache.spark.streaming.flume.sink.{SparkSink, SparkSinkConfig}
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class PollingFlumeTestUtils {
+
+  private val batchCount = 5
+  val eventsPerBatch = 100
+  private val totalEventsPerChannel = batchCount * eventsPerBatch
+  private val channelCapacity = 5000
+
+  def getTotalEvents: Int = totalEventsPerChannel * channels.size
+
+  private val channels = new ArrayBuffer[MemoryChannel]
+  private val sinks = new ArrayBuffer[SparkSink]
+
+  /**
+   * Start a sink and return the port of this sink
+   */
+  def startSingleSink(): Int = {
+    channels.clear()
+    sinks.clear()
+
+    // Start the channel and sink.
+    val context = new Context()
+    context.put("capacity", channelCapacity.toString)
+    context.put("transactionCapacity", "1000")
+    context.put("keep-alive", "0")
+    val channel = new MemoryChannel()
+    Configurables.configure(channel, context)
+
+    val sink = new SparkSink()
+    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+    Configurables.configure(sink, context)
+    sink.setChannel(channel)
+    sink.start()
+
+    channels += (channel)
+    sinks += sink
+
+    sink.getPort()
+  }
+
+  /**
+   * Start 2 sinks and return the ports
+   */
+  def startMultipleSinks(): Seq[Int] = {
+    channels.clear()
+    sinks.clear()
+
+    // Start the channel and sink.
+    val context = new Context()
+    context.put("capacity", channelCapacity.toString)
+    context.put("transactionCapacity", "1000")
+    context.put("keep-alive", "0")
+    val channel = new MemoryChannel()
+    Configurables.configure(channel, context)
+
+    val channel2 = new MemoryChannel()
+    Configurables.configure(channel2, context)
+
+    val sink = new SparkSink()
+    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+    Configurables.configure(sink, context)
+    sink.setChannel(channel)
+    sink.start()
+
+    val sink2 = new SparkSink()
+    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+    Configurables.configure(sink2, context)
+    sink2.setChannel(channel2)
+    sink2.start()
+
+    sinks += sink
+    sinks += sink2
+    channels += channel
+    channels += channel2
+
+    sinks.map(_.getPort())
+  }
+
+  /**
+   * Send data and wait until all data has been received
+   */
+  def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
+    val executor = Executors.newCachedThreadPool()
+    val executorCompletion = new ExecutorCompletionService[Void](executor)
+
+    val latch = new CountDownLatch(batchCount * channels.size)
+    sinks.foreach(_.countdownWhenBatchReceived(latch))
+
+    channels.foreach(channel => {
+      executorCompletion.submit(new TxnSubmitter(channel))
+    })
+
+    for (i <- 0 until channels.size) {
+      executorCompletion.take()
+    }
+
+    latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
+  }
+
+  /**
+   * A Python-friendly method to assert the output
+   */
+  def assertOutput(
+      outputHeaders: JList[JMap[String, String]], outputBodies: 
JList[String]): Unit = {
+    require(outputHeaders.size == outputBodies.size)
+    val eventSize = outputHeaders.size
+    if (eventSize != totalEventsPerChannel * channels.size) {
+      throw new AssertionError(
+        s"Expected ${totalEventsPerChannel * channels.size} events, but was 
$eventSize")
+    }
+    var counter = 0
+    for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
+      val eventBodyToVerify = s"${channels(k).getName}-$i"
+      val eventHeaderToVerify: JMap[String, String] = 
Collections.singletonMap(s"test-$i", "header")
+      var found = false
+      var j = 0
+      while (j < eventSize && !found) {
+        if (eventBodyToVerify == outputBodies.get(j) &&
+          eventHeaderToVerify == outputHeaders.get(j)) {
+          found = true
+          counter += 1
+        }
+        j += 1
+      }
+    }
+    if (counter != totalEventsPerChannel * channels.size) {
+      throw new AssertionError(
+        s"111 Expected ${totalEventsPerChannel * channels.size} events, but 
was $counter")
+    }
+  }
+
+  def assertChannelsAreEmpty(): Unit = {
+    channels.foreach(assertChannelIsEmpty)
+  }
+
+  private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
+    val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
+    queueRemaining.setAccessible(true)
+    val m = 
queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+    if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
+      throw new AssertionError(s"Channel ${channel.getName} is not empty")
+    }
+  }
+
+  def close(): Unit = {
+    sinks.foreach(_.stop())
+    sinks.clear()
+    channels.foreach(_.stop())
+    channels.clear()
+  }
+
+  private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] {
+    override def call(): Void = {
+      var t = 0
+      for (i <- 0 until batchCount) {
+        val tx = channel.getTransaction
+        tx.begin()
+        for (j <- 0 until eventsPerBatch) {
+          channel.put(EventBuilder.withBody(
+            s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8),
+            Collections.singletonMap(s"test-$t", "header")))
+          t += 1
+        }
+        tx.commit()
+        tx.close()
+        Thread.sleep(500) // Allow some time for the events to reach
+      }
+      null
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
new file mode 100644
index 0000000..d31aa5f
--- /dev/null
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Spark streaming receiver for Flume.
+ */
+package org.apache.spark.streaming.flume;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
new file mode 100644
index 0000000..9bfab68
--- /dev/null
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Spark streaming receiver for Flume.
+ */
+package object flume

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
 
b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..cfedb5a
--- /dev/null
+++ 
b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+    protected transient JavaStreamingContext ssc;
+
+    @Before
+    public void setUp() {
+        SparkConf conf = new SparkConf()
+            .setMaster("local[2]")
+            .setAppName("test")
+            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+        ssc = new JavaStreamingContext(conf, new Duration(1000));
+        ssc.checkpoint("checkpoint");
+    }
+
+    @After
+    public void tearDown() {
+        ssc.stop();
+        ssc = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
 
b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
new file mode 100644
index 0000000..79c5b91
--- /dev/null
+++ 
b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume;
+
+import java.net.InetSocketAddress;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
+  @Test
+  public void testFlumeStream() {
+    // tests the API, does not actually test data receiving
+    InetSocketAddress[] addresses = new InetSocketAddress[] {
+        new InetSocketAddress("localhost", 12345)
+    };
+    JavaReceiverInputDStream<SparkFlumeEvent> test1 =
+        FlumeUtils.createPollingStream(ssc, "localhost", 12345);
+    JavaReceiverInputDStream<SparkFlumeEvent> test2 = 
FlumeUtils.createPollingStream(
+        ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaReceiverInputDStream<SparkFlumeEvent> test3 = 
FlumeUtils.createPollingStream(
+        ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaReceiverInputDStream<SparkFlumeEvent> test4 = 
FlumeUtils.createPollingStream(
+        ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
 
b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
new file mode 100644
index 0000000..3b5e0c7
--- /dev/null
+++ 
b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
+  @Test
+  public void testFlumeStream() {
+    // tests the API, does not actually test data receiving
+    JavaReceiverInputDStream<SparkFlumeEvent> test1 = 
FlumeUtils.createStream(ssc, "localhost", 12345);
+    JavaReceiverInputDStream<SparkFlumeEvent> test2 = 
FlumeUtils.createStream(ssc, "localhost", 12345,
+      StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaReceiverInputDStream<SparkFlumeEvent> test3 = 
FlumeUtils.createStream(ssc, "localhost", 12345,
+      StorageLevel.MEMORY_AND_DISK_SER_2(), false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/flume/src/test/resources/log4j.properties 
b/external/flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000..75e3b53
--- /dev/null
+++ b/external/flume/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p 
%c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
 
b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
new file mode 100644
index 0000000..c97a27c
--- /dev/null
+++ 
b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
+import org.apache.spark.util.Utils
+
+/**
+ * This is a output stream just for the testsuites. All the output is 
collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ *
+ * The buffer contains a sequence of RDD's, each containing a sequence of items
+ */
+class TestOutputStream[T: ClassTag](parent: DStream[T],
+    val output: ConcurrentLinkedQueue[Seq[T]] = new 
ConcurrentLinkedQueue[Seq[T]]())
+  extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+    val collected = rdd.collect()
+    output.add(collected)
+  }, false) {
+
+  // This is to clear the output buffer every it is read from a checkpoint
+  @throws(classOf[IOException])
+  private def readObject(ois: ObjectInputStream): Unit = 
Utils.tryOrIOException {
+    ois.defaultReadObject()
+    output.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
new file mode 100644
index 0000000..1567124
--- /dev/null
+++ 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.net.InetSocketAddress
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.util.{ManualClock, Utils}
+
+class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with 
Logging {
+
+  val maxAttempts = 5
+  val batchDuration = Seconds(1)
+
+  val conf = new SparkConf()
+    .setMaster("local[2]")
+    .setAppName(this.getClass.getSimpleName)
+    .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
+
+  val utils = new PollingFlumeTestUtils
+
+  test("flume polling test") {
+    testMultipleTimes(testFlumePolling)
+  }
+
+  test("flume polling test multiple hosts") {
+    testMultipleTimes(testFlumePollingMultipleHost)
+  }
+
+  /**
+   * Run the given test until no more java.net.BindException's are thrown.
+   * Do this only up to a certain attempt limit.
+   */
+  private def testMultipleTimes(test: () => Unit): Unit = {
+    var testPassed = false
+    var attempt = 0
+    while (!testPassed && attempt < maxAttempts) {
+      try {
+        test()
+        testPassed = true
+      } catch {
+        case e: Exception if Utils.isBindCollision(e) =>
+          logWarning("Exception when running flume polling test: " + e)
+          attempt += 1
+      }
+    }
+    assert(testPassed, s"Test failed after $attempt attempts!")
+  }
+
+  private def testFlumePolling(): Unit = {
+    try {
+      val port = utils.startSingleSink()
+
+      writeAndVerify(Seq(port))
+      utils.assertChannelsAreEmpty()
+    } finally {
+      utils.close()
+    }
+  }
+
+  private def testFlumePollingMultipleHost(): Unit = {
+    try {
+      val ports = utils.startMultipleSinks()
+      writeAndVerify(ports)
+      utils.assertChannelsAreEmpty()
+    } finally {
+      utils.close()
+    }
+  }
+
+  def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
+    // Set up the streaming context and input streams
+    val ssc = new StreamingContext(conf, batchDuration)
+    val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", 
port))
+    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+      FlumeUtils.createPollingStream(ssc, addresses, 
StorageLevel.MEMORY_AND_DISK,
+        utils.eventsPerBatch, 5)
+    val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+    val outputStream = new TestOutputStream(flumeStream, outputQueue)
+    outputStream.register()
+
+    ssc.start()
+    try {
+      utils.sendDatAndEnsureAllDataHasBeenReceived()
+      val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+      clock.advance(batchDuration.milliseconds)
+
+      // The eventually is required to ensure that all data in the batch has 
been processed.
+      eventually(timeout(10 seconds), interval(100 milliseconds)) {
+        val flattenOutput = outputQueue.asScala.toSeq.flatten
+        val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
+          case (key, value) => (key.toString, value.toString)
+        }).map(_.asJava)
+        val bodies = flattenOutput.map(e => 
JavaUtils.bytesToString(e.event.getBody))
+        utils.assertOutput(headers.asJava, bodies.asJava)
+      }
+    } finally {
+      ssc.stop()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
new file mode 100644
index 0000000..7bac1cc
--- /dev/null
+++ 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, 
TestOutputStream}
+
+class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers 
with Logging {
+  val conf = new 
SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
+  var ssc: StreamingContext = null
+
+  test("flume input stream") {
+    testFlumeStream(testCompression = false)
+  }
+
+  test("flume input compressed stream") {
+    testFlumeStream(testCompression = true)
+  }
+
+  /** Run test on flume stream */
+  private def testFlumeStream(testCompression: Boolean): Unit = {
+    val input = (1 to 100).map { _.toString }
+    val utils = new FlumeTestUtils
+    try {
+      val outputQueue = startContext(utils.getTestPort(), testCompression)
+
+      eventually(timeout(10 seconds), interval(100 milliseconds)) {
+        utils.writeInput(input.asJava, testCompression)
+      }
+
+      eventually(timeout(10 seconds), interval(100 milliseconds)) {
+        val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
+        outputEvents.foreach {
+          event =>
+            event.getHeaders.get("test") should be("header")
+        }
+        val output = outputEvents.map(event => 
JavaUtils.bytesToString(event.getBody))
+        output should be (input)
+      }
+    } finally {
+      if (ssc != null) {
+        ssc.stop()
+      }
+      utils.close()
+    }
+  }
+
+  /** Setup and start the streaming context */
+  private def startContext(
+      testPort: Int, testCompression: Boolean): 
(ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
+    ssc = new StreamingContext(conf, Milliseconds(200))
+    val flumeStream = FlumeUtils.createStream(
+      ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, 
testCompression)
+    val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+    val outputStream = new TestOutputStream(flumeStream, outputQueue)
+    outputStream.register()
+    ssc.start()
+    outputQueue
+  }
+
+  /** Class to create socket channel with compression */
+  private class CompressionChannelFactory(compressionLevel: Int)
+    extends NioClientSocketChannelFactory {
+
+    override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
+      val encoder = new ZlibEncoder(compressionLevel)
+      pipeline.addFirst("deflater", encoder)
+      pipeline.addFirst("inflater", new ZlibDecoder())
+      super.newChannel(pipeline)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 92a32e7..b4cfa3a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,6 +101,9 @@
     <module>sql/hive</module>
     <module>external/docker-integration-tests</module>
     <module>assembly</module>
+    <module>external/flume</module>
+    <module>external/flume-sink</module>
+    <module>external/flume-assembly</module>
     <module>examples</module>
     <module>repl</module>
     <module>launcher</module>
@@ -123,6 +126,7 @@
     <yarn.version>${hadoop.version}</yarn.version>
     <hbase.version>0.98.17-hadoop2</hbase.version>
     <hbase.artifact>hbase</hbase.artifact>
+    <flume.version>1.6.0</flume.version>
     <zookeeper.version>3.4.5</zookeeper.version>
     <curator.version>2.4.0</curator.version>
     <hive.group>org.spark-project.hive</hive.group>
@@ -189,6 +193,7 @@
       during compilation if the dependency is transivite (e.g. "graphx/" 
depending on "core/" and
       needing Hadoop classes in the classpath to compile).
     -->
+    <flume.deps.scope>compile</flume.deps.scope>
     <hadoop.deps.scope>compile</hadoop.deps.scope>
     <hbase.deps.scope>compile</hbase.deps.scope>
     <hive.deps.scope>compile</hive.deps.scope>
@@ -1591,6 +1596,46 @@
         <scope>compile</scope>
       </dependency>
       <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-core</artifactId>
+        <version>${flume.version}</version>
+        <scope>${flume.deps.scope}</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.flume</groupId>
+            <artifactId>flume-ng-auth</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.mortbay.jetty</groupId>
+            <artifactId>servlet-api</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-sdk</artifactId>
+        <version>${flume.version}</version>
+        <scope>${flume.deps.scope}</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
         <groupId>org.apache.calcite</groupId>
         <artifactId>calcite-core</artifactId>
         <version>${calcite.version}</version>
@@ -2442,6 +2487,9 @@
       that does not have them.
     -->
     <profile>
+      <id>flume-provided</id>
+    </profile>
+    <profile>
       <id>hadoop-provided</id>
     </profile>
     <profile>

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index dbe98d1..fb229b9 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -39,9 +39,9 @@ object BuildCommons {
   ).map(ProjectRef(buildLocation, _))
 
   val streamingProjects@Seq(
-    streaming, streamingKafka
+    streaming, streamingFlumeSink, streamingFlume, streamingKafka
   ) = Seq(
-    "streaming", "streaming-kafka"
+    "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka"
   ).map(ProjectRef(buildLocation, _))
 
   val allProjects@Seq(
@@ -56,8 +56,8 @@ object BuildCommons {
     Seq("yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl",
       "docker-integration-tests").map(ProjectRef(buildLocation, _))
 
-  val assemblyProjects@Seq(assembly, networkYarn, streamingKafkaAssembly, 
streamingKinesisAslAssembly) =
-    Seq("assembly", "network-yarn", "streaming-kafka-assembly", 
"streaming-kinesis-asl-assembly")
+  val assemblyProjects@Seq(assembly, networkYarn, streamingFlumeAssembly, 
streamingKafkaAssembly, streamingKinesisAslAssembly) =
+    Seq("assembly", "network-yarn", "streaming-flume-assembly", 
"streaming-kafka-assembly", "streaming-kinesis-asl-assembly")
       .map(ProjectRef(buildLocation, _))
 
   val copyJarsProjects@Seq(examples) = 
Seq("examples").map(ProjectRef(buildLocation, _))
@@ -283,6 +283,8 @@ object SparkBuild extends PomBuild {
   /* Hive console settings */
   enable(Hive.settings)(hive)
 
+  enable(Flume.settings)(streamingFlumeSink)
+
   enable(Java8TestSettings.settings)(java8Tests)
 
   enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
@@ -348,6 +350,10 @@ object Unsafe {
   )
 }
 
+object Flume {
+  lazy val settings = sbtavro.SbtAvro.avroSettings
+}
+
 object DockerIntegrationTests {
   // This serves to override the override specified in DependencyOverrides:
   lazy val settings = Seq(
@@ -526,7 +532,7 @@ object Assembly {
         
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
     },
     jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, 
mName, hv) =>
-      if (mName.contains("streaming-kafka-assembly") || 
mName.contains("streaming-kinesis-asl-assembly")) {
+      if (mName.contains("streaming-flume-assembly") || 
mName.contains("streaming-kafka-assembly") || 
mName.contains("streaming-kinesis-asl-assembly")) {
         // This must match the same name used in maven (see 
external/kafka-assembly/pom.xml)
         s"${mName}-${v}.jar"
       } else {
@@ -644,9 +650,9 @@ object Unidoc {
     publish := {},
 
     unidocProjectFilter in(ScalaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, yarn, 
testTags),
+      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, 
streamingFlumeSink, yarn, testTags),
     unidocProjectFilter in(JavaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, yarn, 
testTags),
+      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, 
streamingFlumeSink, yarn, testTags),
 
     // Skip actual catalyst, but include the subproject.
     // Catalyst is not public API and contains quasiquotes which break 
scaladoc.
@@ -665,7 +671,7 @@ object Unidoc {
       "-public",
       "-group", "Core Java API", packageList("api.java", "api.java.function"),
       "-group", "Spark Streaming", packageList(
-        "streaming.api.java", "streaming.kafka", "streaming.kinesis"
+        "streaming.api.java", "streaming.flume", "streaming.kafka", 
"streaming.kinesis"
       ),
       "-group", "MLlib", packageList(
         "mllib.classification", "mllib.clustering", "mllib.evaluation.binary", 
"mllib.linalg",

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/python/pyspark/streaming/flume.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/flume.py 
b/python/pyspark/streaming/flume.py
new file mode 100644
index 0000000..cd30483
--- /dev/null
+++ b/python/pyspark/streaming/flume.py
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+if sys.version >= "3":
+    from io import BytesIO
+else:
+    from StringIO import StringIO
+from py4j.protocol import Py4JJavaError
+
+from pyspark.storagelevel import StorageLevel
+from pyspark.serializers import PairDeserializer, NoOpSerializer, 
UTF8Deserializer, read_int
+from pyspark.streaming import DStream
+
+__all__ = ['FlumeUtils', 'utf8_decoder']
+
+
+def utf8_decoder(s):
+    """ Decode the unicode as UTF-8 """
+    if s is None:
+        return None
+    return s.decode('utf-8')
+
+
+class FlumeUtils(object):
+
+    @staticmethod
+    def createStream(ssc, hostname, port,
+                     storageLevel=StorageLevel.MEMORY_AND_DISK_2,
+                     enableDecompression=False,
+                     bodyDecoder=utf8_decoder):
+        """
+        Create an input stream that pulls events from Flume.
+
+        :param ssc:  StreamingContext object
+        :param hostname:  Hostname of the slave machine to which the flume 
data will be sent
+        :param port:  Port of the slave machine to which the flume data will 
be sent
+        :param storageLevel:  Storage level to use for storing the received 
objects
+        :param enableDecompression:  Should netty server decompress input 
stream
+        :param bodyDecoder:  A function used to decode body (default is 
utf8_decoder)
+        :return: A DStream object
+        """
+        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+        helper = FlumeUtils._get_helper(ssc._sc)
+        jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, 
enableDecompression)
+        return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
+
+    @staticmethod
+    def createPollingStream(ssc, addresses,
+                            storageLevel=StorageLevel.MEMORY_AND_DISK_2,
+                            maxBatchSize=1000,
+                            parallelism=5,
+                            bodyDecoder=utf8_decoder):
+        """
+        Creates an input stream that is to be used with the Spark Sink 
deployed on a Flume agent.
+        This stream will poll the sink for data and will pull events as they 
are available.
+
+        :param ssc:  StreamingContext object
+        :param addresses:  List of (host, port)s on which the Spark Sink is 
running.
+        :param storageLevel:  Storage level to use for storing the received 
objects
+        :param maxBatchSize:  The maximum number of events to be pulled from 
the Spark sink
+                              in a single RPC call
+        :param parallelism:  Number of concurrent requests this stream should 
send to the sink.
+                             Note that having a higher number of requests 
concurrently being pulled
+                             will result in this stream using more threads
+        :param bodyDecoder:  A function used to decode body (default is 
utf8_decoder)
+        :return: A DStream object
+        """
+        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+        hosts = []
+        ports = []
+        for (host, port) in addresses:
+            hosts.append(host)
+            ports.append(port)
+        helper = FlumeUtils._get_helper(ssc._sc)
+        jstream = helper.createPollingStream(
+            ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
+        return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
+
+    @staticmethod
+    def _toPythonDStream(ssc, jstream, bodyDecoder):
+        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+        stream = DStream(jstream, ssc, ser)
+
+        def func(event):
+            headersBytes = BytesIO(event[0]) if sys.version >= "3" else 
StringIO(event[0])
+            headers = {}
+            strSer = UTF8Deserializer()
+            for i in range(0, read_int(headersBytes)):
+                key = strSer.loads(headersBytes)
+                value = strSer.loads(headersBytes)
+                headers[key] = value
+            body = bodyDecoder(event[1])
+            return (headers, body)
+        return stream.map(func)
+
+    @staticmethod
+    def _get_helper(sc):
+        try:
+            return 
sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper()
+        except TypeError as e:
+            if str(e) == "'JavaPackage' object is not callable":
+                FlumeUtils._printErrorMsg(sc)
+            raise
+
+    @staticmethod
+    def _printErrorMsg(sc):
+        print("""
+________________________________________________________________________________________________
+
+  Spark Streaming's Flume libraries not found in class path. Try one of the 
following.
+
+  1. Include the Flume library and its dependencies with in the
+     spark-submit command as
+
+     $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s 
...
+
+  2. Download the JAR of the artifact from Maven Central 
http://search.maven.org/,
+     Group Id = org.apache.spark, Artifact Id = 
spark-streaming-flume-assembly, Version = %s.
+     Then, include the jar in the spark-submit command as
+
+     $ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (sc.version, sc.version))

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index eb4696c..d010c0e 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -45,6 +45,7 @@ from pyspark.context import SparkConf, SparkContext, RDD
 from pyspark.storagelevel import StorageLevel
 from pyspark.streaming.context import StreamingContext
 from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, 
TopicAndPartition
+from pyspark.streaming.flume import FlumeUtils
 from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
 from pyspark.streaming.listener import StreamingListener
 
@@ -1260,6 +1261,148 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         self._validateStreamResult({"aa": 1, "bb": 2, "cc": 3}, stream)
 
 
+class FlumeStreamTests(PySparkStreamingTestCase):
+    timeout = 20  # seconds
+    duration = 1
+
+    def setUp(self):
+        super(FlumeStreamTests, self).setUp()
+        self._utils = 
self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils()
+
+    def tearDown(self):
+        if self._utils is not None:
+            self._utils.close()
+            self._utils = None
+
+        super(FlumeStreamTests, self).tearDown()
+
+    def _startContext(self, n, compressed):
+        # Start the StreamingContext and also collect the result
+        dstream = FlumeUtils.createStream(self.ssc, "localhost", 
self._utils.getTestPort(),
+                                          enableDecompression=compressed)
+        result = []
+
+        def get_output(_, rdd):
+            for event in rdd.collect():
+                if len(result) < n:
+                    result.append(event)
+        dstream.foreachRDD(get_output)
+        self.ssc.start()
+        return result
+
+    def _validateResult(self, input, result):
+        # Validate both the header and the body
+        header = {"test": "header"}
+        self.assertEqual(len(input), len(result))
+        for i in range(0, len(input)):
+            self.assertEqual(header, result[i][0])
+            self.assertEqual(input[i], result[i][1])
+
+    def _writeInput(self, input, compressed):
+        # Try to write input to the receiver until success or timeout
+        start_time = time.time()
+        while True:
+            try:
+                self._utils.writeInput(input, compressed)
+                break
+            except:
+                if time.time() - start_time < self.timeout:
+                    time.sleep(0.01)
+                else:
+                    raise
+
+    def test_flume_stream(self):
+        input = [str(i) for i in range(1, 101)]
+        result = self._startContext(len(input), False)
+        self._writeInput(input, False)
+        self.wait_for(result, len(input))
+        self._validateResult(input, result)
+
+    def test_compressed_flume_stream(self):
+        input = [str(i) for i in range(1, 101)]
+        result = self._startContext(len(input), True)
+        self._writeInput(input, True)
+        self.wait_for(result, len(input))
+        self._validateResult(input, result)
+
+
+class FlumePollingStreamTests(PySparkStreamingTestCase):
+    timeout = 20  # seconds
+    duration = 1
+    maxAttempts = 5
+
+    def setUp(self):
+        self._utils = 
self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils()
+
+    def tearDown(self):
+        if self._utils is not None:
+            self._utils.close()
+            self._utils = None
+
+    def _writeAndVerify(self, ports):
+        # Set up the streaming context and input streams
+        ssc = StreamingContext(self.sc, self.duration)
+        try:
+            addresses = [("localhost", port) for port in ports]
+            dstream = FlumeUtils.createPollingStream(
+                ssc,
+                addresses,
+                maxBatchSize=self._utils.eventsPerBatch(),
+                parallelism=5)
+            outputBuffer = []
+
+            def get_output(_, rdd):
+                for e in rdd.collect():
+                    outputBuffer.append(e)
+
+            dstream.foreachRDD(get_output)
+            ssc.start()
+            self._utils.sendDatAndEnsureAllDataHasBeenReceived()
+
+            self.wait_for(outputBuffer, self._utils.getTotalEvents())
+            outputHeaders = [event[0] for event in outputBuffer]
+            outputBodies = [event[1] for event in outputBuffer]
+            self._utils.assertOutput(outputHeaders, outputBodies)
+        finally:
+            ssc.stop(False)
+
+    def _testMultipleTimes(self, f):
+        attempt = 0
+        while True:
+            try:
+                f()
+                break
+            except:
+                attempt += 1
+                if attempt >= self.maxAttempts:
+                    raise
+                else:
+                    import traceback
+                    traceback.print_exc()
+
+    def _testFlumePolling(self):
+        try:
+            port = self._utils.startSingleSink()
+            self._writeAndVerify([port])
+            self._utils.assertChannelsAreEmpty()
+        finally:
+            self._utils.close()
+
+    def _testFlumePollingMultipleHosts(self):
+        try:
+            port = self._utils.startSingleSink()
+            self._writeAndVerify([port])
+            self._utils.assertChannelsAreEmpty()
+        finally:
+            self._utils.close()
+
+    def test_flume_polling(self):
+        self._testMultipleTimes(self._testFlumePolling)
+
+    def test_flume_polling_multiple_hosts(self):
+        self._testMultipleTimes(self._testFlumePollingMultipleHosts)
+
+
 class KinesisStreamTests(PySparkStreamingTestCase):
 
     def test_kinesis_stream_api(self):
@@ -1348,6 +1491,23 @@ def search_kafka_assembly_jar():
         return jars[0]
 
 
+def search_flume_assembly_jar():
+    SPARK_HOME = os.environ["SPARK_HOME"]
+    flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
+    jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly")
+    if not jars:
+        raise Exception(
+            ("Failed to find Spark Streaming Flume assembly jar in %s. " % 
flume_assembly_dir) +
+            "You need to build Spark with "
+            "'build/sbt assembly/assembly streaming-flume-assembly/assembly' 
or "
+            "'build/mvn package' before running this test.")
+    elif len(jars) > 1:
+        raise Exception(("Found multiple Spark Streaming Flume assembly JARs: 
%s; please "
+                        "remove all but one") % (", ".join(jars)))
+    else:
+        return jars[0]
+
+
 def search_kinesis_asl_assembly_jar():
     SPARK_HOME = os.environ["SPARK_HOME"]
     kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, 
"external/kinesis-asl-assembly")
@@ -1368,18 +1528,20 @@ are_kinesis_tests_enabled = 
os.environ.get(kinesis_test_environ_var) == '1'
 if __name__ == "__main__":
     from pyspark.streaming.tests import *
     kafka_assembly_jar = search_kafka_assembly_jar()
+    flume_assembly_jar = search_flume_assembly_jar()
     kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
 
     if kinesis_asl_assembly_jar is None:
         kinesis_jar_present = False
-        jars = kafka_assembly_jar
+        jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar)
     else:
         kinesis_jar_present = True
-        jars = "%s,%s" % (kafka_assembly_jar, kinesis_asl_assembly_jar)
+        jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, 
kinesis_asl_assembly_jar)
 
     os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
     testcases = [BasicOperationTests, WindowFunctionTests, 
StreamingContextTests, CheckpointTests,
-                 KafkaStreamTests, StreamingListenerTests]
+                 KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests,
+                 StreamingListenerTests]
 
     if kinesis_jar_present is True:
         testcases.append(KinesisStreamTests)

http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 10ee7d5..1eb680d 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -18,6 +18,8 @@
 package test.org.apache.spark.sql;
 
 import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -303,27 +305,42 @@ public class JavaDataFrameSuite {
     Assert.assertEquals(30000.0, actual.get(1).getDouble(2), 0.01);
   }
 
+  private String getResource(String resource) {
+    try {
+      // The following "getResource" has different behaviors in SBT and Maven.
+      // When running in Jenkins, the file path may contain "@" when there are 
multiple
+      // SparkPullRequestBuilders running in the same worker
+      // (e.g., /home/jenkins/workspace/SparkPullRequestBuilder@2)
+      // When running in SBT, "@" in the file path will be returned as "@", 
however,
+      // when running in Maven, "@" will be encoded as "%40".
+      // Therefore, we convert it to URI then call "getPath" to decode it back 
so that it can both
+      // work both in SBT and Maven.
+      URL url = 
Thread.currentThread().getContextClassLoader().getResource(resource);
+      return url.toURI().getPath();
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Test
   public void testGenericLoad() {
-    Dataset<Row> df1 = context.read().format("text").load(
-      
Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString());
+    Dataset<Row> df1 = 
context.read().format("text").load(getResource("text-suite.txt"));
     Assert.assertEquals(4L, df1.count());
 
     Dataset<Row> df2 = context.read().format("text").load(
-      
Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(),
-      
Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString());
+      getResource("text-suite.txt"),
+      getResource("text-suite2.txt"));
     Assert.assertEquals(5L, df2.count());
   }
 
   @Test
   public void testTextLoad() {
-    Dataset<String> ds1 = context.read().text(
-      
Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString());
+    Dataset<String> ds1 = context.read().text(getResource("text-suite.txt"));
     Assert.assertEquals(4L, ds1.count());
 
     Dataset<String> ds2 = context.read().text(
-      
Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(),
-      
Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString());
+      getResource("text-suite.txt"),
+      getResource("text-suite2.txt"));
     Assert.assertEquals(5L, ds2.count());
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to