This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/camel-master by this push: new fc78b55 Added AWS v2 SNS sink integration test fc78b55 is described below commit fc78b554ccd467a09a3081f634aa3cdcac3ab8d7 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Mon Nov 9 15:59:52 2020 +0100 Added AWS v2 SNS sink integration test --- tests/itests-aws-v2/pom.xml | 5 + .../v2/sns/sink/CamelAWSSNSPropertyFactory.java | 89 +++++++++++ .../aws/v2/sns/sink/CamelSinkAWSSNSITCase.java | 172 +++++++++++++++++++++ .../aws/v2/sns/sink/TestSnsConfiguration.java | 40 +++++ 4 files changed, 306 insertions(+) diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml index 62048dc..84b13e8 100644 --- a/tests/itests-aws-v2/pom.xml +++ b/tests/itests-aws-v2/pom.xml @@ -87,6 +87,11 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-aws2-kms</artifactId> </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-aws2-sns</artifactId> + </dependency> </dependencies> <build> diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelAWSSNSPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelAWSSNSPropertyFactory.java new file mode 100644 index 0000000..0fafbfb --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelAWSSNSPropertyFactory.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.sns.sink; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; +import org.apache.camel.test.infra.aws.common.AWSConfigs; +import software.amazon.awssdk.regions.Region; + +/** + * Creates the set of properties used by a Camel JMS Sink Connector + */ +final class CamelAWSSNSPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSSNSPropertyFactory> { + public static final Map<String, String> SPRING_STYLE = new HashMap<>(); + public static final Map<String, String> KAFKA_STYLE = new HashMap<>(); + + static { + SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-sns.accessKey"); + SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-sns.secretKey"); + SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-sns.region"); + + KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-sns.access-key"); + KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-sns.secret-key"); + KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-sns.region"); + } + + private CamelAWSSNSPropertyFactory() { + } + + public EndpointUrlBuilder<CamelAWSSNSPropertyFactory> withUrl(String topicOrArn) { + String sinkUrl = String.format("aws2-sns:%s", topicOrArn); + + return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl); + } + + public CamelAWSSNSPropertyFactory withTopicOrArn(String topicOrArn) { + return setProperty("camel.sink.path.topicNameOrArn", topicOrArn); + } + + public CamelAWSSNSPropertyFactory withSubscribeSNStoSQS(String queue) { + return setProperty("camel.sink.endpoint.subscribeSNStoSQS", "true").setProperty("camel.sink.endpoint.queueUrl", + queue); + } + + public CamelAWSSNSPropertyFactory withAmazonConfig(Properties amazonConfigs) { + return withAmazonConfig(amazonConfigs, this.SPRING_STYLE); + } + + public CamelAWSSNSPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) { + String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY); + String secretKeyKey = style.get(AWSConfigs.SECRET_KEY); + String regionKey = style.get(AWSConfigs.REGION); + + setProperty(accessKeyKey, amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, "")); + setProperty(secretKeyKey, amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, "")); + return setProperty(regionKey, amazonConfigs.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id())); + } + + public CamelAWSSNSPropertyFactory withConfiguration(String configurationClass) { + return setProperty("camel.component.aws2-sns.configuration", classRef(configurationClass)); + } + + public static CamelAWSSNSPropertyFactory basic() { + return new CamelAWSSNSPropertyFactory().withName("CamelAWS2SNSSinkConnector") + .withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.aws2sns.CamelAws2snsSinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java new file mode 100644 index 0000000..f78fc68 --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.sns.sink; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.aws.common.AWSCommon; +import org.apache.camel.test.infra.aws.common.AWSConfigs; +import org.apache.camel.test.infra.aws.common.services.AWSService; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; +import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.model.Message; + +import static org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory.classRef; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") +public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport { + @RegisterExtension + public static AWSService service = AWSServiceFactory.createSNSService(); + + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSNSITCase.class); + + private AWSSQSClient awsSqsClient; + private String sqsQueueUrl; + private String queueName; + + private volatile int received; + private final int expect = 10; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-aws2-sns-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + awsSqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient()); + + queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000); + sqsQueueUrl = awsSqsClient.createQueue(queueName); + + LOG.info("Created SQS queue {}", sqsQueueUrl); + + received = 0; + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(120, TimeUnit.SECONDS)) { + assertEquals(expect, received, + "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail("Failed to receive the messages within the specified time"); + } + } + + private boolean checkMessages(List<Message> messages) { + for (Message message : messages) { + LOG.info("Received: {}", message.body()); + + received++; + } + + if (received == expect) { + return false; + } + + return true; + } + + protected void consumeMessages(CountDownLatch latch) { + try { + awsSqsClient.receive(sqsQueueUrl, this::checkMessages); + } catch (Throwable t) { + LOG.error("Failed to consume messages: {}", t.getMessage(), t); + fail(t.getMessage()); + } finally { + latch.countDown(); + } + } + + @Test + @Timeout(value = 90) + public void testBasicSendReceive() throws Exception { + Properties amazonProperties = service.getConnectionProperties(); + String topicName = getTopicForTest(this.getClass()); + + ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory + .basic() + .withName("CamelAWSSNSSinkConnectorDefault") + .withTopics(topicName) + .withTopicOrArn(queueName) + .withSubscribeSNStoSQS(sqsQueueUrl) + .withConfiguration(TestSnsConfiguration.class.getName()) + .withAmazonConfig(amazonProperties); + + runTest(connectorPropertyFactory, topicName, expect); + } + + @Test + @Timeout(value = 90) + public void testBasicSendReceiveUsingKafkaStyle() throws Exception { + Properties amazonProperties = service.getConnectionProperties(); + String topicName = getTopicForTest(this.getClass()); + + ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory + .basic() + .withName("CamelAWSSNSSinkKafkaStyleConnector") + .withTopics(topicName) + .withTopicOrArn(queueName) + .withSubscribeSNStoSQS(sqsQueueUrl) + .withConfiguration(TestSnsConfiguration.class.getName()) + .withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE); + + runTest(connectorPropertyFactory, topicName, expect); + } + + @Test + @Timeout(value = 90) + public void testBasicSendReceiveUsingUrl() throws Exception { + Properties amazonProperties = service.getConnectionProperties(); + String topicName = getTopicForTest(this.getClass()); + + ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory + .basic() + .withName("CamelAWSSNSSinkKafkaStyleConnectorWithUrl") + .withTopics(topicName) + .withUrl(queueName) + .append("queueUrl", sqsQueueUrl) + .append("subscribeSNStoSQS", "true") + .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id())) + .append("configuration", classRef(TestSnsConfiguration.class.getName())) + .buildUrl(); + + runTest(connectorPropertyFactory, topicName, expect); + + } +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/TestSnsConfiguration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/TestSnsConfiguration.java new file mode 100644 index 0000000..8dbc040 --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/TestSnsConfiguration.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.sns.sink; + +import org.apache.camel.component.aws2.sns.Sns2Configuration; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; +import software.amazon.awssdk.services.sns.SnsClient; + +public class TestSnsConfiguration extends Sns2Configuration { + private SnsClient snsClient; + + public TestSnsConfiguration() { + snsClient = AWSSDKClientUtils.newSNSClient(); + } + + @Override + public void setAmazonSNSClient(SnsClient amazonSNSClient) { + // NO-OP + } + + @Override + public SnsClient getAmazonSNSClient() { + return snsClient; + } +}