This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit cc35b51880cbe5214e9cbdac8831e91d645e1720 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 10:54:01 2021 +0100 Convert the AWS v1 tests to the new reusable sink test base class --- .../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java | 131 +++++++++------------ .../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java | 130 ++++++++------------ 2 files changed, 102 insertions(+), 159 deletions(-) diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java index 9d7cc80..8d893c9 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java @@ -18,19 +18,16 @@ package org.apache.camel.kafkaconnector.aws.v1.sns.sink; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.amazonaws.regions.Regions; import com.amazonaws.services.sqs.model.Message; import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.clients.AWSClientUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; @@ -52,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") -public class CamelSinkAWSSNSITCase extends AbstractKafkaTest { +public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport { @RegisterExtension public static AWSService service = AWSServiceFactory.createSNSService(); @@ -61,6 +58,7 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest { private AWSSQSClient awsSqsClient; private String sqsQueueUrl; private String queueName; + private String topicName; private volatile int received; private final int expect = 10; @@ -72,16 +70,31 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - awsSqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient()); + topicName = getTopicForTest(this); + awsSqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient()); queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000); sqsQueueUrl = awsSqsClient.getQueue(queueName); LOG.info("Created SQS queue {}", sqsQueueUrl); - received = 0; } + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(120, TimeUnit.SECONDS)) { + assertEquals(expect, received, + "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail("Failed to receive the messages within the specified time"); + } + } + private boolean checkMessages(List<Message> messages) { for (Message message : messages) { LOG.info("Received: {}", message.getBody()); @@ -96,7 +109,8 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest { return true; } - private void consumeMessages(CountDownLatch latch) { + @Override + protected void consumeMessages(CountDownLatch latch) { try { awsSqsClient.receiveFrom(sqsQueueUrl, this::checkMessages); } catch (Throwable t) { @@ -107,92 +121,53 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest { } } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) - throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - ExecutorService service = Executors.newCachedThreadPool(); - - LOG.debug("Creating the consumer ..."); - CountDownLatch latch = new CountDownLatch(1); - service.submit(() -> consumeMessages(latch)); - - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - - for (int i = 0; i < expect; i++) { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i); - } + @Test + @Timeout(value = 90) + public void testBasicSendReceive() throws Exception { + Properties amazonProperties = service.getConnectionProperties(); - LOG.debug("Created the consumer ... About to receive messages"); + ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic() + .withName("CamelAWSSNSSinkConnectorDefault") + .withTopics(topicName) + .withTopicOrArn(queueName) + .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName()) + .withAmazonConfig(amazonProperties); - if (latch.await(120, TimeUnit.SECONDS)) { - assertEquals(expect, received, - "Didn't process the expected amount of messages: " + received + " != " + expect); - } else { - fail("Failed to receive the messages within the specified time"); - } + runTest(connectorPropertyFactory, topicName, expect); } @Test @Timeout(value = 90) - public void testBasicSendReceive() { - try { - Properties amazonProperties = service.getConnectionProperties(); - - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic() - .withName("CamelAWSSNSSinkConnectorDefault") - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withTopicOrArn(queueName) - .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName()) - .withAmazonConfig(amazonProperties); - - runTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("Amazon SNS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } - } + public void testBasicSendReceiveUsingKafkaStyle() throws Exception { + Properties amazonProperties = service.getConnectionProperties(); - @Test - @Timeout(value = 90) - public void testBasicSendReceiveUsingKafkaStyle() { - try { - Properties amazonProperties = service.getConnectionProperties(); - - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic() - .withName("CamelAWSSNSSinkKafkaStyleConnector") - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withTopicOrArn(queueName) - .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName()) - .withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE); - - runTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("Amazon SNS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic() + .withName("CamelAWSSNSSinkKafkaStyleConnector") + .withTopics(topicName) + .withTopicOrArn(queueName) + .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName()) + .withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE); + + runTest(connectorPropertyFactory, topicName, expect); } @Disabled("AWS SNS component is failing to parse the sink URL for this one") @Test @Timeout(value = 90) - public void testBasicSendReceiveUsingUrl() { - try { - Properties amazonProperties = service.getConnectionProperties(); + public void testBasicSendReceiveUsingUrl() throws Exception { + Properties amazonProperties = service.getConnectionProperties(); - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic() - .withName("CamelAWSSNSSinkKafkaStyleConnector") - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(queueName) + ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic() + .withName("CamelAWSSNSSinkKafkaStyleConnector") + .withTopics(topicName) + .withUrl(queueName) .append("queueUrl", sqsQueueUrl).append("subscribeSNStoSQS", "true") .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY)) .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())) - .append("configuration", "#class:" + TestSNSConfiguration.class.getName()).buildUrl(); + .append("configuration", "#class:" + TestSNSConfiguration.class.getName()) + .buildUrl(); - runTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("Amazon SNS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + runTest(connectorPropertyFactory, topicName, expect); } } diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java index 45028ae..b38441f 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java @@ -18,18 +18,16 @@ package org.apache.camel.kafkaconnector.aws.v1.sqs.sink; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.amazonaws.regions.Regions; import com.amazonaws.services.sqs.model.Message; import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.clients.AWSClientUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; @@ -53,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") -public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { +public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport { @RegisterExtension public static AWSService awsService = AWSServiceFactory.createSQSService(); @@ -62,6 +60,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { private AWSSQSClient awssqsClient; private String queueName; private String queueUrl; + private String topicName; private volatile int received; private final int expect = 10; @@ -73,6 +72,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { + topicName = getTopicForTest(this); awssqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient()); queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000); @@ -90,6 +90,22 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { } } + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(110, TimeUnit.SECONDS)) { + assertEquals(expect, received, + "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail(String.format("Failed to receive the messages within the specified time: received %d of %d", received, + expect)); + } + } + private boolean checkMessages(List<Message> messages) { for (Message message : messages) { LOG.info("Received: {}", message.getBody()); @@ -104,7 +120,8 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { return true; } - private void consumeMessages(CountDownLatch latch) { + @Override + protected void consumeMessages(CountDownLatch latch) { try { awssqsClient.receiveFrom(queueUrl, this::checkMessages); } catch (Throwable t) { @@ -114,104 +131,55 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { } } - private void produceMessages() { - try { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - - for (int i = 0; i < expect; i++) { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i); - } - } catch (Throwable t) { - LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t); - fail(String.format("Unable to publish messages to the broker: %s", t.getMessage())); - } - } - - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); - - LOG.debug("Creating the consumer ..."); - ExecutorService service = Executors.newCachedThreadPool(); - - CountDownLatch latch = new CountDownLatch(1); - service.submit(() -> consumeMessages(latch)); - - LOG.debug("Creating the producer and sending messages ..."); - produceMessages(); - - LOG.debug("Waiting for the test to complete"); - if (latch.await(110, TimeUnit.SECONDS)) { - assertEquals(expect, received, - "Didn't process the expected amount of messages: " + received + " != " + expect); - } else { - fail(String.format("Failed to receive the messages within the specified time: received %d of %d", received, - expect)); - } - } - @Test @Timeout(value = 120) - public void testBasicSendReceive() { - try { - Properties amazonProperties = awsService.getConnectionProperties(); + public void testBasicSendReceive() throws Exception { + Properties amazonProperties = awsService.getConnectionProperties(); - ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic() - .withName("CamelAwssqsSinkConnectorSpringBootStyle") - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withAmazonConfig(amazonProperties) - .withQueueNameOrArn(queueName); + ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic() + .withName("CamelAwssqsSinkConnectorSpringBootStyle") + .withTopics(topicName) + .withAmazonConfig(amazonProperties) + .withQueueNameOrArn(queueName); + + runTest(testProperties, topicName, expect); - runTest(testProperties); - } catch (Exception e) { - LOG.error("Amazon SQS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } } @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote") @Timeout(value = 120) @RepeatedTest(3) - public void testBasicSendReceiveUsingKafkaStyle() { - try { - Properties amazonProperties = awsService.getConnectionProperties(); + public void testBasicSendReceiveUsingKafkaStyle() throws Exception { + Properties amazonProperties = awsService.getConnectionProperties(); - ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic() - .withName("CamelAwssqsSinkConnectorKafkaStyle") - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) - .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE) - .withQueueNameOrArn(queueName); + ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic() + .withName("CamelAwssqsSinkConnectorKafkaStyle") + .withTopics(topicName) + .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE) + .withQueueNameOrArn(queueName); - runTest(testProperties); - - } catch (Exception e) { - LOG.error("Amazon SQS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + runTest(testProperties, topicName, expect); } @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote") @Timeout(value = 120) @RepeatedTest(3) - public void testBasicSendReceiveUsingUrl() { - try { - Properties amazonProperties = awsService.getConnectionProperties(); + public void testBasicSendReceiveUsingUrl() throws Exception { + Properties amazonProperties = awsService.getConnectionProperties(); - ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic() - .withName("CamelAwssqsSinkConnectorUsingUrl") - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(queueName) + ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic() + .withName("CamelAwssqsSinkConnectorUsingUrl") + .withTopics(topicName) + .withUrl(queueName) .append("autoCreateQueue", "true") .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY)) .append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL)) .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())) - .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST)).buildUrl(); + .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST)) + .buildUrl(); - runTest(testProperties); - - } catch (Exception e) { - LOG.error("Amazon SQS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + runTest(testProperties, topicName, expect); } }