Repository: spark
Updated Branches:
  refs/heads/branch-1.1 7286d5707 -> 1d468df33


[SPARK-3154][STREAMING] Make FlumePollingInputDStream shutdown cleaner.

Currently lot of errors get thrown from Avro IPC layer when the dstream
or sink is shutdown. This PR cleans it up. Some refactoring is done in the
receiver code to put all of the RPC code into a single Try and just recover
from that. The sink code has also been cleaned up.

Author: Hari Shreedharan <[email protected]>

Closes #2065 from harishreedharan/clean-flume-shutdown and squashes the 
following commits:

f93a07c [Hari Shreedharan] Formatting fixes.
d7427cc [Hari Shreedharan] More fixes!
a0a8852 [Hari Shreedharan] Fix race condition, hopefully! Minor other changes.
4c9ed02 [Hari Shreedharan] Remove unneeded list in Callback handler. Other misc 
changes.
8fee36f [Hari Shreedharan] Scala-library is required, else maven build fails. 
Also catch InterruptedException in TxnProcessor.
445e700 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into 
clean-flume-shutdown
87232e0 [Hari Shreedharan] Refactor Flume Input Stream. Clean up code, better 
error handling.
9001d26 [Hari Shreedharan] Change log level to debug in 
TransactionProcessor#shutdown method
e7b8d82 [Hari Shreedharan] Incorporate review feedback
598efa7 [Hari Shreedharan] Clean up some exception handling code
e1027c6 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into 
clean-flume-shutdown
ed608c8 [Hari Shreedharan] [SPARK-3154][STREAMING] Make 
FlumePollingInputDStream shutdown cleaner.

(cherry picked from commit 6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d468df3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d468df3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d468df3

Branch: refs/heads/branch-1.1
Commit: 1d468df33c7b8680af12fcdb66ed91f48c80cae3
Parents: 7286d57
Author: Hari Shreedharan <[email protected]>
Authored: Wed Aug 27 02:39:02 2014 -0700
Committer: Tathagata Das <[email protected]>
Committed: Wed Aug 27 02:39:21 2014 -0700

----------------------------------------------------------------------
 external/flume-sink/pom.xml                     |   4 +
 .../flume/sink/SparkAvroCallbackHandler.scala   |  56 +++++--
 .../flume/sink/TransactionProcessor.scala       |  18 +-
 .../streaming/flume/FlumeBatchFetcher.scala     | 167 +++++++++++++++++++
 .../flume/FlumePollingInputDStream.scala        |  77 ++-------
 5 files changed, 236 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1d468df3/external/flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index a297459..17d0fe2 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -71,6 +71,10 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+    </dependency>
+    <dependency>
       <!--
         Netty explicitly added in test as it has been excluded from
         Flume dependency (to avoid runtime problems when running with

http://git-wip-us.apache.org/repos/asf/spark/blob/1d468df3/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
----------------------------------------------------------------------
diff --git 
a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
 
b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index 7da8eb3..e77cf7b 100644
--- 
a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ 
b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -19,6 +19,8 @@ package org.apache.spark.streaming.flume.sink
 import java.util.concurrent.{ConcurrentHashMap, Executors}
 import java.util.concurrent.atomic.AtomicLong
 
+import scala.collection.JavaConversions._
+
 import org.apache.flume.Channel
 import org.apache.commons.lang.RandomStringUtils
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -45,7 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: 
Int, val channel: Cha
   val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
     new ThreadFactoryBuilder().setDaemon(true)
       .setNameFormat("Spark Sink Processor Thread - %d").build()))
-  private val processorMap = new ConcurrentHashMap[CharSequence, 
TransactionProcessor]()
+  private val sequenceNumberToProcessor =
+    new ConcurrentHashMap[CharSequence, TransactionProcessor]()
   // This sink will not persist sequence numbers and reuses them if it gets 
restarted.
   // So it is possible to commit a transaction which may have been meant for 
the sink before the
   // restart.
@@ -55,6 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: 
Int, val channel: Cha
   private val seqBase = RandomStringUtils.randomAlphanumeric(8)
   private val seqCounter = new AtomicLong(0)
 
+  @volatile private var stopped = false
+
   /**
    * Returns a bunch of events to Spark over Avro RPC.
    * @param n Maximum number of events to return in a batch
@@ -63,18 +68,33 @@ private[flume] class SparkAvroCallbackHandler(val threads: 
Int, val channel: Cha
   override def getEventBatch(n: Int): EventBatch = {
     logDebug("Got getEventBatch call from Spark.")
     val sequenceNumber = seqBase + seqCounter.incrementAndGet()
-    val processor = new TransactionProcessor(channel, sequenceNumber,
-      n, transactionTimeout, backOffInterval, this)
-    transactionExecutorOpt.foreach(executor => {
-      executor.submit(processor)
-    })
-    // Wait until a batch is available - will be an error if error message is 
non-empty
-    val batch = processor.getEventBatch
-    if (!SparkSinkUtils.isErrorBatch(batch)) {
-      processorMap.put(sequenceNumber.toString, processor)
-      logDebug("Sending event batch with sequence number: " + sequenceNumber)
+    createProcessor(sequenceNumber, n) match {
+      case Some(processor) =>
+        transactionExecutorOpt.foreach(_.submit(processor))
+        // Wait until a batch is available - will be an error if error message 
is non-empty
+        val batch = processor.getEventBatch
+        if (SparkSinkUtils.isErrorBatch(batch)) {
+          // Remove the processor if it is an error batch since no ACK is sent.
+          removeAndGetProcessor(sequenceNumber)
+          logWarning("Received an error batch - no events were received from 
channel! ")
+        }
+        batch
+      case None =>
+        new EventBatch("Spark sink has been stopped!", "", 
java.util.Collections.emptyList())
+    }
+  }
+
+  private def createProcessor(seq: String, n: Int): 
Option[TransactionProcessor] = {
+    sequenceNumberToProcessor.synchronized {
+      if (!stopped) {
+        val processor = new TransactionProcessor(
+          channel, seq, n, transactionTimeout, backOffInterval, this)
+        sequenceNumberToProcessor.put(seq, processor)
+        Some(processor)
+      } else {
+        None
+      }
     }
-    batch
   }
 
   /**
@@ -116,7 +136,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: 
Int, val channel: Cha
    *         longer tracked and the caller is responsible for that txn 
processor.
    */
   private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): 
TransactionProcessor = {
-    processorMap.remove(sequenceNumber.toString) // The toString is required!
+    sequenceNumberToProcessor.synchronized {
+      sequenceNumberToProcessor.remove(sequenceNumber.toString)
+    }
   }
 
   /**
@@ -124,8 +146,10 @@ private[flume] class SparkAvroCallbackHandler(val threads: 
Int, val channel: Cha
    */
   def shutdown() {
     logInfo("Shutting down Spark Avro Callback Handler")
-    transactionExecutorOpt.foreach(executor => {
-      executor.shutdownNow()
-    })
+    sequenceNumberToProcessor.synchronized {
+      stopped = true
+      sequenceNumberToProcessor.values().foreach(_.shutdown())
+    }
+    transactionExecutorOpt.foreach(_.shutdownNow())
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1d468df3/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
----------------------------------------------------------------------
diff --git 
a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
 
b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
index b9e3c78..13f3aa9 100644
--- 
a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
+++ 
b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -60,6 +60,8 @@ private class TransactionProcessor(val channel: Channel, val 
seqNum: String,
   // succeeded.
   @volatile private var batchSuccess = false
 
+  @volatile private var stopped = false
+
   // The transaction that this processor would handle
   var txOpt: Option[Transaction] = None
 
@@ -88,6 +90,11 @@ private class TransactionProcessor(val channel: Channel, val 
seqNum: String,
     batchAckLatch.countDown()
   }
 
+  private[flume] def shutdown(): Unit = {
+    logDebug("Shutting down transaction processor")
+    stopped = true
+  }
+
   /**
    * Populates events into the event batch. If the batch cannot be populated,
    * this method will not set the events into the event batch, but it sets an 
error message.
@@ -106,7 +113,7 @@ private class TransactionProcessor(val channel: Channel, 
val seqNum: String,
         var gotEventsInThisTxn = false
         var loopCounter: Int = 0
         loop.breakable {
-          while (events.size() < maxBatchSize
+          while (!stopped && events.size() < maxBatchSize
             && loopCounter < totalAttemptsToRemoveFromChannel) {
             loopCounter += 1
             Option(channel.take()) match {
@@ -115,7 +122,7 @@ private class TransactionProcessor(val channel: Channel, 
val seqNum: String,
                   ByteBuffer.wrap(event.getBody)))
                 gotEventsInThisTxn = true
               case None =>
-                if (!gotEventsInThisTxn) {
+                if (!gotEventsInThisTxn && !stopped) {
                   logDebug("Sleeping for " + backOffInterval + " millis as no 
events were read in" +
                     " the current transaction")
                   TimeUnit.MILLISECONDS.sleep(backOffInterval)
@@ -125,7 +132,7 @@ private class TransactionProcessor(val channel: Channel, 
val seqNum: String,
             }
           }
         }
-        if (!gotEventsInThisTxn) {
+        if (!gotEventsInThisTxn && !stopped) {
           val msg = "Tried several times, " +
             "but did not get any events from the channel!"
           logWarning(msg)
@@ -136,6 +143,11 @@ private class TransactionProcessor(val channel: Channel, 
val seqNum: String,
         }
       })
     } catch {
+      case interrupted: InterruptedException =>
+        // Don't pollute logs if the InterruptedException came from this being 
stopped
+        if (!stopped) {
+          logWarning("Error while processing transaction.", interrupted)
+        }
       case e: Exception =>
         logWarning("Error while processing transaction.", e)
         eventBatch.setErrorMsg(e.getMessage)

http://git-wip-us.apache.org/repos/asf/spark/blob/1d468df3/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
new file mode 100644
index 0000000..88cc2aa
--- /dev/null
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.base.Throwables
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.flume.sink._
+
+/**
+ * This class implements the core functionality of [[FlumePollingReceiver]]. 
When started it
+ * pulls data from Flume, stores it to Spark and then sends an Ack or Nack. 
This class should be
+ * run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]]
+ *
+ * @param receiver The receiver that owns this instance.
+ */
+
+private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends 
Runnable with
+  Logging {
+
+  def run(): Unit = {
+    while (!receiver.isStopped()) {
+      val connection = receiver.getConnections.poll()
+      val client = connection.client
+      var batchReceived = false
+      var seq: CharSequence = null
+      try {
+        getBatch(client) match {
+          case Some(eventBatch) =>
+            batchReceived = true
+            seq = eventBatch.getSequenceNumber
+            val events = toSparkFlumeEvents(eventBatch.getEvents)
+            if (store(events)) {
+              sendAck(client, seq)
+            } else {
+              sendNack(batchReceived, client, seq)
+            }
+          case None =>
+        }
+      } catch {
+        case e: Exception =>
+          Throwables.getRootCause(e) match {
+            // If the cause was an InterruptedException, then check if the 
receiver is stopped -
+            // if yes, just break out of the loop. Else send a Nack and log a 
warning.
+            // In the unlikely case, the cause was not an Exception,
+            // then just throw it out and exit.
+            case interrupted: InterruptedException =>
+              if (!receiver.isStopped()) {
+                logWarning("Interrupted while receiving data from Flume", 
interrupted)
+                sendNack(batchReceived, client, seq)
+              }
+            case exception: Exception =>
+              logWarning("Error while receiving data from Flume", exception)
+              sendNack(batchReceived, client, seq)
+          }
+      } finally {
+        receiver.getConnections.add(connection)
+      }
+    }
+  }
+
+  /**
+   * Gets a batch of events from the specified client. This method does not 
handle any exceptions
+   * which will be propogated to the caller.
+   * @param client Client to get events from
+   * @return [[Some]] which contains the event batch if Flume sent any events 
back, else [[None]]
+   */
+  private def getBatch(client: SparkFlumeProtocol.Callback): 
Option[EventBatch] = {
+    val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
+    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
+      // No error, proceed with processing data
+      logDebug(s"Received batch of ${eventBatch.getEvents.size} events with 
sequence " +
+        s"number: ${eventBatch.getSequenceNumber}")
+      Some(eventBatch)
+    } else {
+      logWarning("Did not receive events from Flume agent due to error on the 
Flume agent: " +
+        eventBatch.getErrorMsg)
+      None
+    }
+  }
+
+  /**
+   * Store the events in the buffer to Spark. This method will not propogate 
any exceptions,
+   * but will propogate any other errors.
+   * @param buffer The buffer to store
+   * @return true if the data was stored without any exception being thrown, 
else false
+   */
+  private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
+    try {
+      receiver.store(buffer)
+      true
+    } catch {
+      case e: Exception =>
+        logWarning("Error while attempting to store data received from Flume", 
e)
+        false
+    }
+  }
+
+  /**
+   * Send an ack to the client for the sequence number. This method does not 
handle any exceptions
+   * which will be propagated to the caller.
+   * @param client client to send the ack to
+   * @param seq sequence number of the batch to be ack-ed.
+   * @return
+   */
+  private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): 
Unit = {
+    logDebug("Sending ack for sequence number: " + seq)
+    client.ack(seq)
+    logDebug("Ack sent for sequence number: " + seq)
+  }
+
+  /**
+   * This method sends a Nack if a batch was received to the client with the 
given sequence
+   * number. Any exceptions thrown by the RPC call is simply thrown out as is 
- no effort is made
+   * to handle it.
+   * @param batchReceived true if a batch was received. If this is false, no 
nack is sent
+   * @param client The client to which the nack should be sent
+   * @param seq The sequence number of the batch that is being nack-ed.
+   */
+  private def sendNack(batchReceived: Boolean, client: 
SparkFlumeProtocol.Callback,
+    seq: CharSequence): Unit = {
+    if (batchReceived) {
+      // Let Flume know that the events need to be pushed back into the 
channel.
+      logDebug("Sending nack for sequence number: " + seq)
+      client.nack(seq) // If the agent is down, even this could fail and throw
+      logDebug("Nack sent for sequence number: " + seq)
+    }
+  }
+
+  /**
+   * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
+   * @param events - Events to convert to SparkFlumeEvents
+   * @return - The SparkFlumeEvent generated from SparkSinkEvent
+   */
+  private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
+    ArrayBuffer[SparkFlumeEvent] = {
+    // Convert each Flume event to a serializable SparkFlumeEvent
+    val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
+    var j = 0
+    while (j < events.size()) {
+      val event = events(j)
+      val sparkFlumeEvent = new SparkFlumeEvent()
+      sparkFlumeEvent.event.setBody(event.getBody)
+      sparkFlumeEvent.event.setHeaders(event.getHeaders)
+      buffer += sparkFlumeEvent
+      j += 1
+    }
+    buffer
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1d468df3/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 148262b..92fa5b4 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -18,10 +18,9 @@ package org.apache.spark.streaming.flume
 
 
 import java.net.InetSocketAddress
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
+import java.util.concurrent.{LinkedBlockingQueue, Executors}
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -86,61 +85,9 @@ private[streaming] class FlumePollingReceiver(
       connections.add(new FlumeConnection(transceiver, client))
     })
     for (i <- 0 until parallelism) {
-      logInfo("Starting Flume Polling Receiver worker threads starting..")
+      logInfo("Starting Flume Polling Receiver worker threads..")
       // Threads that pull data from Flume.
-      receiverExecutor.submit(new Runnable {
-        override def run(): Unit = {
-          while (true) {
-            val connection = connections.poll()
-            val client = connection.client
-            try {
-              val eventBatch = client.getEventBatch(maxBatchSize)
-              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
-                // No error, proceed with processing data
-                val seq = eventBatch.getSequenceNumber
-                val events: java.util.List[SparkSinkEvent] = 
eventBatch.getEvents
-                logDebug(
-                  "Received batch of " + events.size() + " events with 
sequence number: " + seq)
-                try {
-                  // Convert each Flume event to a serializable SparkFlumeEvent
-                  val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
-                  var j = 0
-                  while (j < events.size()) {
-                    buffer += toSparkFlumeEvent(events(j))
-                    j += 1
-                  }
-                  store(buffer)
-                  logDebug("Sending ack for sequence number: " + seq)
-                  // Send an ack to Flume so that Flume discards the events 
from its channels.
-                  client.ack(seq)
-                  logDebug("Ack sent for sequence number: " + seq)
-                } catch {
-                  case e: Exception =>
-                    try {
-                      // Let Flume know that the events need to be pushed back 
into the channel.
-                      logDebug("Sending nack for sequence number: " + seq)
-                      client.nack(seq) // If the agent is down, even this 
could fail and throw
-                      logDebug("Nack sent for sequence number: " + seq)
-                    } catch {
-                      case e: Exception => logError(
-                        "Sending Nack also failed. A Flume agent is down.")
-                    }
-                    TimeUnit.SECONDS.sleep(2L) // for now just leave this as a 
fixed 2 seconds.
-                    logWarning("Error while attempting to store events", e)
-                }
-              } else {
-                logWarning("Did not receive events from Flume agent due to 
error on the Flume " +
-                  "agent: " + eventBatch.getErrorMsg)
-              }
-            } catch {
-              case e: Exception =>
-                logWarning("Error while reading data from Flume", e)
-            } finally {
-              connections.add(connection)
-            }
-          }
-        }
-      })
+      receiverExecutor.submit(new FlumeBatchFetcher(this))
     }
   }
 
@@ -153,16 +100,12 @@ private[streaming] class FlumePollingReceiver(
     channelFactory.releaseExternalResources()
   }
 
-  /**
-   * Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]]
-   * @param event - Event to convert to SparkFlumeEvent
-   * @return - The SparkFlumeEvent generated from SparkSinkEvent
-   */
-  private def toSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = {
-    val sparkFlumeEvent = new SparkFlumeEvent()
-    sparkFlumeEvent.event.setBody(event.getBody)
-    sparkFlumeEvent.event.setHeaders(event.getHeaders)
-    sparkFlumeEvent
+  private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = {
+    this.connections
+  }
+
+  private[flume] def getMaxBatchSize: Int = {
+    this.maxBatchSize
   }
 }
 
@@ -171,7 +114,7 @@ private[streaming] class FlumePollingReceiver(
  * @param transceiver The transceiver to use for communication with Flume
  * @param client The client that the callbacks are received on.
  */
-private class FlumeConnection(val transceiver: NettyTransceiver,
+private[flume] class FlumeConnection(val transceiver: NettyTransceiver,
   val client: SparkFlumeProtocol.Callback)
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to