This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 9957652 Fixes AWS test cases when running with remote AWS services new a27894e Merge pull request #308 from orpiske/fix-aws-cases-remote-execution 9957652 is described below commit 995765277c241d5012bee4b4952b792bd054d8e8 Author: Otavio R. Piske <angusyo...@gmail.com> AuthorDate: Fri Jun 26 22:05:27 2020 +0200 Fixes AWS test cases when running with remote AWS services --- .../aws/s3/source/CamelSourceAWSS3ITCase.java | 2 +- .../aws/sqs/sink/CamelSinkAWSSQSITCase.java | 19 ++++++++++++------- .../aws/sqs/source/CamelSourceAWSSQSITCase.java | 12 +++++++----- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/s3/source/CamelSourceAWSS3ITCase.java index 9f4e657..e9538e1 100644 --- a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/s3/source/CamelSourceAWSS3ITCase.java +++ b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/s3/source/CamelSourceAWSS3ITCase.java @@ -170,7 +170,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { .withUrl(AWSCommon.DEFAULT_S3_BUCKET) .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY)) - .append("proxyProtocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL)) + .appendIfAvailable("proxyProtocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL)) .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())) .buildUrl(); diff --git a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/sink/CamelSinkAWSSQSITCase.java index d3cb6c7..931b4a1 100644 --- a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/sink/CamelSinkAWSSQSITCase.java +++ b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/sink/CamelSinkAWSSQSITCase.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class); private AWSSQSClient awssqsClient; + private String queueName; private volatile int received; private final int expect = 10; @@ -69,7 +71,9 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { public void setUp() { awssqsClient = awsService.getClient(); - String queueUrl = awssqsClient.getQueue(AWSCommon.DEFAULT_SQS_QUEUE); + queueName = AWSCommon.DEFAULT_SQS_QUEUE + "-" + TestUtils.randomWithRange(0, 1000); + String queueUrl = awssqsClient.getQueue(queueName); + LOG.debug("Using queue {} for the test", queueUrl); received = 0; @@ -78,7 +82,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { @AfterEach public void tearDown() { deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); - if (!awssqsClient.deleteQueue(AWSCommon.DEFAULT_SQS_QUEUE)) { + if (!awssqsClient.deleteQueue(queueName)) { fail("Failed to delete queue"); } } @@ -100,7 +104,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { private void consumeMessages(CountDownLatch latch) { try { - awssqsClient.receive(AWSCommon.DEFAULT_SQS_QUEUE, this::checkMessages); + awssqsClient.receive(queueName, this::checkMessages); } catch (Throwable t) { LOG.error("Failed to consume messages: {}", t.getMessage(), t); } finally { @@ -147,7 +151,6 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { @Test @Timeout(value = 120) - @RepeatedTest(3) public void testBasicSendReceive() { try { Properties amazonProperties = awsService.getConnectionProperties(); @@ -157,7 +160,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { .withName("CamelAwssqsSinkConnectorSpringBootStyle") .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) .withAmazonConfig(amazonProperties) - .withQueueNameOrArn(AWSCommon.DEFAULT_SQS_QUEUE); + .withQueueNameOrArn(queueName); runTest(testProperties); @@ -167,6 +170,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { } } + @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote") @Test @Timeout(value = 120) @RepeatedTest(3) @@ -179,7 +183,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { .withName("CamelAwssqsSinkConnectorKafkaStyle") .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE) - .withQueueNameOrArn(AWSCommon.DEFAULT_SQS_QUEUE); + .withQueueNameOrArn(queueName); runTest(testProperties); @@ -189,6 +193,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { } } + @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote") @Test @Timeout(value = 120) @RepeatedTest(3) @@ -200,7 +205,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { .basic() .withName("CamelAwssqsSinkConnectorUsingUrl") .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) - .withUrl(AWSCommon.DEFAULT_SQS_QUEUE) + .withUrl(queueName) .append("autoCreateQueue", "true") .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY)) diff --git a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/source/CamelSourceAWSSQSITCase.java index 634a6e7..b866ee7 100644 --- a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/source/CamelSourceAWSSQSITCase.java +++ b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/source/CamelSourceAWSSQSITCase.java @@ -52,6 +52,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class); private AWSSQSClient awssqsClient; + private String queueName; private volatile int received; private final int expect = 10; @@ -64,6 +65,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { awssqsClient = service.getClient(); + queueName = AWSCommon.DEFAULT_SQS_QUEUE + "-" + TestUtils.randomWithRange(0, 1000); received = 0; } @@ -71,7 +73,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { public void tearDown() { deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); - if (!awssqsClient.deleteQueue(AWSCommon.DEFAULT_SQS_QUEUE)) { + if (!awssqsClient.deleteQueue(queueName)) { fail("Failed to delete queue"); } } @@ -93,7 +95,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { LOG.debug("Sending SQS messages"); for (int i = 0; i < expect; i++) { - awssqsClient.send(AWSCommon.DEFAULT_SQS_QUEUE, "Source test message " + i); + awssqsClient.send(queueName, "Source test message " + i); } LOG.debug("Done sending SQS messages"); @@ -111,7 +113,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory .basic() .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withQueueOrArn(AWSCommon.DEFAULT_SQS_QUEUE) + .withQueueOrArn(queueName) .withAmazonConfig(service.getConnectionProperties()); runTest(connectorPropertyFactory); @@ -126,7 +128,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory .basic() .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withQueueOrArn(AWSCommon.DEFAULT_SQS_QUEUE) + .withQueueOrArn(queueName) .withAmazonConfig(service.getConnectionProperties(), CamelAWSSQSPropertyFactory.KAFKA_STYLE); runTest(connectorPropertyFactory); @@ -143,7 +145,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory .basic() .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withUrl(AWSCommon.DEFAULT_SQS_QUEUE) + .withUrl(queueName) .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY)) .append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))