This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 61a497e AWS Kinesis test fixes when in remote mode new 94de150 Merge pull request #345 from orpiske/fix-kinesis-qa 61a497e is described below commit 61a497e069df96bd055da62f96a6d6e812c2346c Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Jul 29 13:24:09 2020 +0200 AWS Kinesis test fixes when in remote mode Includes: - Check for the stream presence before creating it - Adjusted the code to use a different stream every test - Removed unused test constants - Removes unecessary sleep after deletion --- .../camel/kafkaconnector/aws/common/AWSCommon.java | 10 --- .../source/CamelSourceAWSKinesisITCase.java | 90 +++++++++++++++------- 2 files changed, 64 insertions(+), 36 deletions(-) diff --git a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java index f5abf42..98b2c4e 100644 --- a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java +++ b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java @@ -38,20 +38,10 @@ public final class AWSCommon { public static final String DEFAULT_SQS_QUEUE_FOR_SNS = "ckcsns"; /** - * The default SNS queue name used during the tests - */ - public static final String DEFAULT_SNS_QUEUE = "ckc-sns"; - - /** * The default S3 bucket name used during the tests */ public static final String DEFAULT_S3_BUCKET = "ckc-s3"; - /** - * The default Kinesis stream name used during the tests - */ - public static final String DEFAULT_KINESIS_STREAM = "ckc-kin-stream"; - private AWSCommon() { } diff --git a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java index 7ed229b..d27bac1 100644 --- a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java +++ b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java @@ -27,10 +27,12 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.CreateStreamResult; import com.amazonaws.services.kinesis.model.DeleteStreamResult; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.PutRecordsRequest; import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; import com.amazonaws.services.kinesis.model.PutRecordsResult; -import org.apache.camel.kafkaconnector.aws.common.AWSCommon; +import com.amazonaws.services.kinesis.model.ResourceInUseException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import org.apache.camel.kafkaconnector.aws.services.AWSService; import org.apache.camel.kafkaconnector.aws.services.AWSServiceFactory; import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; @@ -54,9 +56,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { @RegisterExtension public static AWSService<AmazonKinesis> service = AWSServiceFactory.createKinesisService(); - + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class); + private static final String KINESIS_STREAM_BASE_NAME = "ckc-kin-stream"; + private String streamName; + private AmazonKinesis awsKinesisClient; private volatile int received; @@ -67,13 +72,8 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { return new String[] {"camel-aws-kinesis-kafka-connector"}; } - - @BeforeEach - public void setUp() { - awsKinesisClient = service.getClient(); - received = 0; - - CreateStreamResult result = awsKinesisClient.createStream(AWSCommon.DEFAULT_KINESIS_STREAM, 1); + private void doCreateStream() { + CreateStreamResult result = awsKinesisClient.createStream(streamName, 1); if (result.getSdkHttpMetadata().getHttpStatusCode() != 200) { fail("Failed to create the stream"); } else { @@ -81,30 +81,67 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { } } - @AfterEach - public void tearDown() { - DeleteStreamResult result = awsKinesisClient.deleteStream(AWSCommon.DEFAULT_KINESIS_STREAM); + private void createStream() { + try { + LOG.info("Checking whether the stream exists already"); + DescribeStreamResult describeStreamResult = awsKinesisClient.describeStream(streamName); + + int status = describeStreamResult.getSdkHttpMetadata().getHttpStatusCode(); + LOG.info("Kinesis stream check result: {}", status); + } catch (ResourceNotFoundException e) { + LOG.info("The stream does not exist, auto creating it ..."); + doCreateStream(); + } + } + + private void doDeleteStream() { + DeleteStreamResult result = awsKinesisClient.deleteStream(streamName); if (result.getSdkHttpMetadata().getHttpStatusCode() != 200) { fail("Failed to delete the stream"); } else { - try { - // Because of the latency used to simulate the Kinesis API call (defined by the KINESIS_LATENCY) in - // the LocalStack configuration, the test needs to wait at least the same amount of time as set there - // in order to proceed. Otherwise the it fails to create the stream in the setUp phase. - // Ref.: https://github.com/localstack/localstack/issues/231#issuecomment-319959693 - Thread.sleep(500); - LOG.info("Stream deleted successfully"); - } catch (InterruptedException e) { - fail("Test interrupted while waiting for the stream to cool down"); - } + LOG.info("Stream deleted successfully"); } + } + + private void deleteStream() { + try { + LOG.info("Checking whether the stream exists already"); + DescribeStreamResult describeStreamResult = awsKinesisClient.describeStream(streamName); + + int status = describeStreamResult.getSdkHttpMetadata().getHttpStatusCode(); + LOG.info("Kinesis stream check result: {}", status); + doDeleteStream(); + } catch (ResourceNotFoundException e) { + LOG.info("The stream does not exist, skipping deletion"); + } catch (ResourceInUseException e) { + LOG.info("The stream exist but cannot be deleted because it's in use"); + doDeleteStream(); + } + } + + + @BeforeEach + public void setUp() { + streamName = KINESIS_STREAM_BASE_NAME + "-" + TestUtils.randomWithRange(0, 100); + + awsKinesisClient = service.getClient(); + received = 0; + + createStream(); + } + + + @AfterEach + public void tearDown() { + deleteStream(); awsKinesisClient.shutdown(); deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); } + private boolean checkRecord(ConsumerRecord<String, String> record) { LOG.debug("Received: {}", record.value()); received++; @@ -139,11 +176,12 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) .withAmazonConfig(service.getConnectionProperties()) .withConfiguration(TestKinesisConfiguration.class.getName()) - .withStreamName(AWSCommon.DEFAULT_KINESIS_STREAM); + .withStreamName(streamName); runtTest(connectorPropertyFactory); } + @Test @Timeout(120) public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException { @@ -152,7 +190,7 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) .withAmazonConfig(service.getConnectionProperties(), CamelAWSKinesisPropertyFactory.KAFKA_STYLE) .withConfiguration(TestKinesisConfiguration.class.getName()) - .withStreamName(AWSCommon.DEFAULT_KINESIS_STREAM); + .withStreamName(streamName); runtTest(connectorPropertyFactory); } @@ -165,7 +203,7 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) .withAmazonConfig(service.getConnectionProperties()) .withConfiguration(TestKinesisConfiguration.class.getName()) - .withUrl(AWSCommon.DEFAULT_KINESIS_STREAM) + .withUrl(streamName) .buildUrl(); runtTest(connectorPropertyFactory); @@ -173,7 +211,7 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { private void putRecords() { PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); - putRecordsRequest.setStreamName(AWSCommon.DEFAULT_KINESIS_STREAM); + putRecordsRequest.setStreamName(streamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();