Repository: spark Updated Branches: refs/heads/master d3a3840e0 -> 8764fe368
SPARK-3744 [STREAMING] FlumeStreamSuite will fail during port contention Since it looked quite easy, I took the liberty of making a quick PR that just uses `Utils.startServiceOnPort` to fix this. It works locally for me. Author: Sean Owen <[email protected]> Closes #2601 from srowen/SPARK-3744 and squashes the following commits: ddc9319 [Sean Owen] Avoid port contention in tests by retrying several ports for Flume stream Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8764fe36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8764fe36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8764fe36 Branch: refs/heads/master Commit: 8764fe368bbd72fe76ed318faad0e97a7279e2fe Parents: d3a3840 Author: Sean Owen <[email protected]> Authored: Tue Sep 30 15:18:51 2014 -0700 Committer: Tathagata Das <[email protected]> Committed: Tue Sep 30 15:18:51 2014 -0700 ---------------------------------------------------------------------- .../streaming/flume/FlumeStreamSuite.scala | 25 ++++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8764fe36/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 6ee7ac9..33235d1 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -31,7 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} import org.apache.spark.streaming.util.ManualClock -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.util.Utils import org.jboss.netty.channel.ChannelPipeline import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory @@ -41,21 +41,26 @@ import org.jboss.netty.handler.codec.compression._ class FlumeStreamSuite extends TestSuiteBase { test("flume input stream") { - runFlumeStreamTest(false, 9998) + runFlumeStreamTest(false) } test("flume input compressed stream") { - runFlumeStreamTest(true, 9997) + runFlumeStreamTest(true) } - def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) { + def runFlumeStreamTest(enableDecompression: Boolean) { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] = - FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression) + val (flumeStream, testPort) = + Utils.startServiceOnPort(9997, (trialPort: Int) => { + val dstream = FlumeUtils.createStream( + ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression) + (dstream, trialPort) + }) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer) + val outputStream = new TestOutputStream(flumeStream, outputBuffer) outputStream.register() ssc.start() @@ -63,13 +68,13 @@ class FlumeStreamSuite extends TestSuiteBase { val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) - var client: AvroSourceProtocol = null; - + var client: AvroSourceProtocol = null + if (enableDecompression) { client = SpecificRequestor.getClient( classOf[AvroSourceProtocol], new NettyTransceiver(new InetSocketAddress("localhost", testPort), - new CompressionChannelFactory(6))); + new CompressionChannelFactory(6))) } else { client = SpecificRequestor.getClient( classOf[AvroSourceProtocol], transceiver) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
