Repository: spark
Updated Branches:
  refs/heads/master 04c840910 -> 1afdeb7b4


[STREAMING] [TEST] [HOTFIX] Fixed Kinesis test to not throw weird errors when 
Kinesis tests are enabled without AWS keys

If Kinesis tests are enabled by env ENABLE_KINESIS_TESTS = 1 but no AWS 
credentials are found, the desired behavior is the fail the test using with
```
Exception encountered when attempting to run a suite with class name: 
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDSuite *** ABORTED *** 
(3 seconds, 5 milliseconds)
[info]   java.lang.Exception: Kinesis tests enabled, but could get not AWS 
credentials
```

Instead KinesisStreamSuite fails with

```
[info] - basic operation *** FAILED *** (3 seconds, 35 milliseconds)
[info]   java.lang.IllegalArgumentException: requirement failed: Stream not yet 
created, call createStream() to create one
[info]   at scala.Predef$.require(Predef.scala:233)
[info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.streamName(KinesisTestUtils.scala:77)
[info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils$$anonfun$deleteStream$1.apply(KinesisTestUtils.scala:150)
[info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils$$anonfun$deleteStream$1.apply(KinesisTestUtils.scala:150)
[info]   at org.apache.spark.Logging$class.logWarning(Logging.scala:71)
[info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.logWarning(KinesisTestUtils.scala:39)
[info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.deleteStream(KinesisTestUtils.scala:150)
[info]   at 
org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply$mcV$sp(KinesisStreamSuite.scala:111)
[info]   at 
org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply(KinesisStreamSuite.scala:86)
[info]   at 
org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply(KinesisStreamSuite.scala:86)
```
This is because attempting to delete a non-existent Kinesis stream throws 
uncaught exception. This PR fixes it.

Author: Tathagata Das <[email protected]>

Closes #7809 from tdas/kinesis-test-hotfix and squashes the following commits:

7c372e6 [Tathagata Das] Fixed test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1afdeb7b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1afdeb7b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1afdeb7b

Branch: refs/heads/master
Commit: 1afdeb7b458f86e2641f062fb9ddc00e9c5c7531
Parents: 04c8409
Author: Tathagata Das <[email protected]>
Authored: Thu Jul 30 16:44:02 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Thu Jul 30 16:44:02 2015 -0700

----------------------------------------------------------------------
 .../streaming/kinesis/KinesisTestUtils.scala    | 27 ++++++++++----------
 .../streaming/kinesis/KinesisStreamSuite.scala  |  4 +--
 2 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1afdeb7b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 0ff1b7e..ca39358 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -53,6 +53,8 @@ private class KinesisTestUtils(
 
   @volatile
   private var streamCreated = false
+
+  @volatile
   private var _streamName: String = _
 
   private lazy val kinesisClient = {
@@ -115,21 +117,9 @@ private class KinesisTestUtils(
     shardIdToSeqNumbers.toMap
   }
 
-  def describeStream(streamNameToDescribe: String = streamName): 
Option[StreamDescription] = {
-    try {
-      val describeStreamRequest = new 
DescribeStreamRequest().withStreamName(streamNameToDescribe)
-      val desc = 
kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
-      Some(desc)
-    } catch {
-      case rnfe: ResourceNotFoundException =>
-        None
-    }
-  }
-
   def deleteStream(): Unit = {
     try {
-      if (describeStream().nonEmpty) {
-        val deleteStreamRequest = new DeleteStreamRequest()
+      if (streamCreated) {
         kinesisClient.deleteStream(streamName)
       }
     } catch {
@@ -149,6 +139,17 @@ private class KinesisTestUtils(
     }
   }
 
+  private def describeStream(streamNameToDescribe: String): 
Option[StreamDescription] = {
+    try {
+      val describeStreamRequest = new 
DescribeStreamRequest().withStreamName(streamNameToDescribe)
+      val desc = 
kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+      Some(desc)
+    } catch {
+      case rnfe: ResourceNotFoundException =>
+        None
+    }
+  }
+
   private def findNonExistentStreamName(): String = {
     var testStreamName: String = null
     do {

http://git-wip-us.apache.org/repos/asf/spark/blob/1afdeb7b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index f9c952b..b88c9c6 100644
--- 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -88,11 +88,11 @@ class KinesisStreamSuite extends KinesisFunSuite
     try {
       kinesisTestUtils.createStream()
       ssc = new StreamingContext(sc, Seconds(1))
-      val aWSCredentials = KinesisTestUtils.getAWSCredentials()
+      val awsCredentials = KinesisTestUtils.getAWSCredentials()
       val stream = KinesisUtils.createStream(ssc, kinesisAppName, 
kinesisTestUtils.streamName,
         kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, 
InitialPositionInStream.LATEST,
         Seconds(10), StorageLevel.MEMORY_ONLY,
-        aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey)
+        awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
 
       val collected = new mutable.HashSet[Int] with 
mutable.SynchronizedSet[Int]
       stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to