diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 47071d0cc4714..27bf2ac962721 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress import java.util.concurrent.{Callable, ExecutorCompletionService, Executors} +import java.util.Random import scala.collection.JavaConversions._ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} @@ -37,13 +38,16 @@ import org.apache.spark.streaming.flume.sink._ class FlumePollingStreamSuite extends TestSuiteBase { - val testPort = 9999 + val random = new Random() + /** Return a port in the ephemeral range. */ + def getTestPort = random.nextInt(16382) + 49152 val batchCount = 5 val eventsPerBatch = 100 val totalEventsPerChannel = batchCount * eventsPerBatch val channelCapacity = 5000 test("flume polling test") { + val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = @@ -77,6 +81,7 @@ class FlumePollingStreamSuite extends TestSuiteBase { } test("flume polling test multiple hosts") { + val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))