This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 61a497e  AWS Kinesis test fixes when in remote mode
     new 94de150  Merge pull request #345 from orpiske/fix-kinesis-qa
61a497e is described below

commit 61a497e069df96bd055da62f96a6d6e812c2346c
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Jul 29 13:24:09 2020 +0200

    AWS Kinesis test fixes when in remote mode
    
    Includes:
    
    - Check for the stream presence before creating it
    - Adjusted the code to use a different stream every test
    - Removed unused test constants
    - Removes unecessary sleep after deletion
---
 .../camel/kafkaconnector/aws/common/AWSCommon.java | 10 ---
 .../source/CamelSourceAWSKinesisITCase.java        | 90 +++++++++++++++-------
 2 files changed, 64 insertions(+), 36 deletions(-)

diff --git 
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java
 
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java
index f5abf42..98b2c4e 100644
--- 
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java
+++ 
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java
@@ -38,20 +38,10 @@ public final class AWSCommon {
     public static final String DEFAULT_SQS_QUEUE_FOR_SNS = "ckcsns";
 
     /**
-     * The default SNS queue name used during the tests
-     */
-    public static final String DEFAULT_SNS_QUEUE = "ckc-sns";
-
-    /**
      * The default S3 bucket name used during the tests
      */
     public static final String DEFAULT_S3_BUCKET = "ckc-s3";
 
-    /**
-     * The default Kinesis stream name used during the tests
-     */
-    public static final String DEFAULT_KINESIS_STREAM = "ckc-kin-stream";
-
     private AWSCommon() {
 
     }
diff --git 
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java
 
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java
index 7ed229b..d27bac1 100644
--- 
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java
+++ 
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java
@@ -27,10 +27,12 @@ import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.CreateStreamResult;
 import com.amazonaws.services.kinesis.model.DeleteStreamResult;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.PutRecordsRequest;
 import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
 import com.amazonaws.services.kinesis.model.PutRecordsResult;
-import org.apache.camel.kafkaconnector.aws.common.AWSCommon;
+import com.amazonaws.services.kinesis.model.ResourceInUseException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import org.apache.camel.kafkaconnector.aws.services.AWSService;
 import org.apache.camel.kafkaconnector.aws.services.AWSServiceFactory;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
@@ -54,9 +56,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
     @RegisterExtension
     public static AWSService<AmazonKinesis> service = 
AWSServiceFactory.createKinesisService();
-    
+
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class);
 
+    private static final String KINESIS_STREAM_BASE_NAME = "ckc-kin-stream";
+    private String streamName;
+
     private AmazonKinesis awsKinesisClient;
 
     private volatile int received;
@@ -67,13 +72,8 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
         return new String[] {"camel-aws-kinesis-kafka-connector"};
     }
 
-
-    @BeforeEach
-    public void setUp() {
-        awsKinesisClient = service.getClient();
-        received = 0;
-
-        CreateStreamResult result = 
awsKinesisClient.createStream(AWSCommon.DEFAULT_KINESIS_STREAM, 1);
+    private void doCreateStream() {
+        CreateStreamResult result = awsKinesisClient.createStream(streamName, 
1);
         if (result.getSdkHttpMetadata().getHttpStatusCode() != 200) {
             fail("Failed to create the stream");
         } else {
@@ -81,30 +81,67 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
         }
     }
 
-    @AfterEach
-    public void tearDown() {
-        DeleteStreamResult result = 
awsKinesisClient.deleteStream(AWSCommon.DEFAULT_KINESIS_STREAM);
+    private void createStream() {
+        try {
+            LOG.info("Checking whether the stream exists already");
+            DescribeStreamResult describeStreamResult = 
awsKinesisClient.describeStream(streamName);
+
+            int status = 
describeStreamResult.getSdkHttpMetadata().getHttpStatusCode();
+            LOG.info("Kinesis stream check result: {}", status);
+        } catch (ResourceNotFoundException e) {
+            LOG.info("The stream does not exist, auto creating it ...");
+            doCreateStream();
+        }
+    }
+
+    private void doDeleteStream() {
+        DeleteStreamResult result = awsKinesisClient.deleteStream(streamName);
 
         if (result.getSdkHttpMetadata().getHttpStatusCode() != 200) {
             fail("Failed to delete the stream");
         } else {
-            try {
-                // Because of the latency used to simulate the Kinesis API 
call (defined by the KINESIS_LATENCY) in
-                // the LocalStack configuration, the test needs to wait at 
least the same amount of time as set there
-                // in order to proceed. Otherwise the it fails to create the 
stream in the setUp phase.
-                // Ref.: 
https://github.com/localstack/localstack/issues/231#issuecomment-319959693
-                Thread.sleep(500);
-                LOG.info("Stream deleted successfully");
-            } catch (InterruptedException e) {
-                fail("Test interrupted while waiting for the stream to cool 
down");
-            }
+            LOG.info("Stream deleted successfully");
         }
+    }
+
+    private void deleteStream() {
+        try {
+            LOG.info("Checking whether the stream exists already");
+            DescribeStreamResult describeStreamResult = 
awsKinesisClient.describeStream(streamName);
+
+            int status = 
describeStreamResult.getSdkHttpMetadata().getHttpStatusCode();
+            LOG.info("Kinesis stream check result: {}", status);
+            doDeleteStream();
+        } catch (ResourceNotFoundException e) {
+            LOG.info("The stream does not exist, skipping deletion");
+        } catch (ResourceInUseException e) {
+            LOG.info("The stream exist but cannot be deleted because it's in 
use");
+            doDeleteStream();
+        }
+    }
+
+
+    @BeforeEach
+    public void setUp() {
+        streamName = KINESIS_STREAM_BASE_NAME + "-" + 
TestUtils.randomWithRange(0, 100);
+
+        awsKinesisClient = service.getClient();
+        received = 0;
+
+        createStream();
+    }
+
+
+    @AfterEach
+    public void tearDown() {
+        deleteStream();
 
         awsKinesisClient.shutdown();
 
         deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
     }
 
+
     private boolean checkRecord(ConsumerRecord<String, String> record) {
         LOG.debug("Received: {}", record.value());
         received++;
@@ -139,11 +176,12 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
                 .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
                 .withAmazonConfig(service.getConnectionProperties())
                 .withConfiguration(TestKinesisConfiguration.class.getName())
-                .withStreamName(AWSCommon.DEFAULT_KINESIS_STREAM);
+                .withStreamName(streamName);
 
         runtTest(connectorPropertyFactory);
     }
 
+
     @Test
     @Timeout(120)
     public void testBasicSendReceiveWithKafkaStyle() throws 
ExecutionException, InterruptedException {
@@ -152,7 +190,7 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
                 .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
                 .withAmazonConfig(service.getConnectionProperties(), 
CamelAWSKinesisPropertyFactory.KAFKA_STYLE)
                 .withConfiguration(TestKinesisConfiguration.class.getName())
-                .withStreamName(AWSCommon.DEFAULT_KINESIS_STREAM);
+                .withStreamName(streamName);
 
         runtTest(connectorPropertyFactory);
     }
@@ -165,7 +203,7 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
                 .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
                 .withAmazonConfig(service.getConnectionProperties())
                 .withConfiguration(TestKinesisConfiguration.class.getName())
-                .withUrl(AWSCommon.DEFAULT_KINESIS_STREAM)
+                .withUrl(streamName)
                     .buildUrl();
 
         runtTest(connectorPropertyFactory);
@@ -173,7 +211,7 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
 
     private void putRecords() {
         PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
-        putRecordsRequest.setStreamName(AWSCommon.DEFAULT_KINESIS_STREAM);
+        putRecordsRequest.setStreamName(streamName);
 
         List<PutRecordsRequestEntry> putRecordsRequestEntryList = new 
ArrayList<>();
 

Reply via email to