This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 658da77 Added support for remote AWS testing new 703f37c Merge pull request #107 from orpiske/decouple-aws 658da77 is described below commit 658da770547698d4cb2da8230b2e2675776d1d6d Author: Otavio R. Piske <angusyo...@gmail.com> AuthorDate: Sat Feb 29 15:16:13 2020 +0100 Added support for remote AWS testing Includes: - Added remote testing support for AWS SNS sink - Added remote testing support for AWS SQS source/sink - Added remote testing support for AWS S3 source - Added remote testing support for AWS Kinesis source - Updated documentation for remote testing --- README.adoc | 30 +++- .../apache/camel/kafkaconnector/ContainerUtil.java | 53 ------- .../{ => clients/aws}/AWSConfigs.java | 4 +- .../clients/aws/sqs/AWSSQSClient.java | 11 +- .../aws/sqs/TestAWSCredentialsProvider.java | 69 +++++++++ .../services/aws/AWSClientUtils.java | 168 +++++++++++++++++++++ .../aws/AWSKinesisLocalContainerService.java | 57 +++++++ .../services/aws/AWSLocalContainerService.java | 104 +++++++++++++ .../services/aws/AWSRemoteService.java | 80 ++++++++++ .../services/aws/AWSS3LocalContainerService.java | 57 +++++++ .../services/aws/AWSSNSLocalContainerService.java | 56 +++++++ .../services/aws/AWSSQSLocalContainerService.java | 54 +++++++ .../kafkaconnector/services/aws/AWSService.java | 55 +++++++ .../services/aws/AWSServiceFactory.java | 97 ++++++++++++ .../sink/aws/sns/CamelAWSSNSPropertyFactory.java | 2 +- .../sink/aws/sns/CamelSinkAWSSNSITCase.java | 33 ++-- .../sink/aws/sns/TestSNSConfiguration.java | 70 +-------- .../sink/aws/sqs/CamelAWSSQSPropertyFactory.java | 27 +++- .../sink/aws/sqs/CamelSinkAWSSQSITCase.java | 25 ++- .../kinesis/CamelAWSKinesisPropertyFactory.java | 2 +- .../aws/kinesis/CamelSourceAWSKinesisITCase.java | 38 +---- .../aws/kinesis/TestKinesisConfiguration.java | 52 +------ .../source/aws/s3/CamelAWSS3PropertyFactory.java | 2 +- .../source/aws/s3/CamelSourceAWSS3ITCase.java | 37 +---- .../source/aws/s3/TestS3Configuration.java | 53 +------ .../source/aws/sqs/CamelAWSSQSPropertyFactory.java | 25 ++- .../source/aws/sqs/CamelSourceAWSSQSITCase.java | 27 ++-- 27 files changed, 927 insertions(+), 361 deletions(-) diff --git a/README.adoc b/README.adoc index d9298ca..97693db 100644 --- a/README.adoc +++ b/README.adoc @@ -34,12 +34,40 @@ To run the integration tests it is required to: mvn -DskipIntegrationTests=false clean verify package ---- -It is also possible to point the tests to use an external Kafka broker. To do so, run the tests using: +It is also possible to point the tests to use an external services. To do so, you must set +properties for the services that you want to run. This causes the tests to not launch the local +container and use existing remote instances. At the moment, the following properties can be set +for remote testing: + +* kafka.instance.type +** kafka.bootstrap.servers +* aws-service.instance.type +** access.key: AWS access key (mandatory for remote testing) +** secret.key: AWS secret key (mandatory for remote testing) +** aws.region: AWS region (optional) +** aws.host: AWS host (optional) +* aws-service.kinesis.instance.type +** access.key: AWS access key (mandatory for remote testing) +** secret.key: AWS secret key (mandatory for remote testing) +** aws.region: AWS region (optional) +** aws.host: AWS host (optional) +* elasticsearch.instance.type +** elasticsearch.host +** elasticsearch.port +* cassandra.instance.type +** cassandra.host +** cassandra.cql3.port +* jms-service.instance.type +** jms.broker.address + ---- mvn -Dkafka.bootstrap.servers=host1:port -Dkafka.instance.type=remote -DskipIntegrationTests=false clean verify package ---- +It's possible to use a properties file to set these properties. To do so use `-Dtest.properties=/path/to/file.properties`. + + === Try it out locally You can use Camel Kafka connectors with local Apache Kafka installation. diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/ContainerUtil.java b/tests/src/test/java/org/apache/camel/kafkaconnector/ContainerUtil.java deleted file mode 100644 index ee5e84e..0000000 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/ContainerUtil.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector; - -import java.util.Properties; - -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.regions.Regions; -import org.testcontainers.containers.localstack.LocalStackContainer; - - -public final class ContainerUtil { - - private ContainerUtil() { - } - - - public static Properties setupAWSConfigs(LocalStackContainer container, int service) { - Properties properties = new Properties(); - - final String amazonAWSHost = "localhost:" + container.getMappedPort(service); - properties.put(AWSConfigs.AMAZON_AWS_HOST, amazonAWSHost); - System.setProperty(AWSConfigs.AMAZON_AWS_HOST, amazonAWSHost); - - AWSCredentials credentials = container.getDefaultCredentialsProvider().getCredentials(); - - properties.put(AWSConfigs.ACCESS_KEY, credentials.getAWSAccessKeyId()); - System.setProperty(AWSConfigs.ACCESS_KEY, credentials.getAWSAccessKeyId()); - - properties.put(AWSConfigs.SECRET_KEY, credentials.getAWSSecretKey()); - System.setProperty(AWSConfigs.SECRET_KEY, credentials.getAWSSecretKey()); - - properties.put(AWSConfigs.REGION, Regions.US_EAST_1.name()); - System.setProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()); - - return properties; - } -} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/AWSConfigs.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/AWSConfigs.java similarity index 91% rename from tests/src/test/java/org/apache/camel/kafkaconnector/AWSConfigs.java rename to tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/AWSConfigs.java index f5da049..5038542 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/AWSConfigs.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/AWSConfigs.java @@ -14,8 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.camel.kafkaconnector; +package org.apache.camel.kafkaconnector.clients.aws; public final class AWSConfigs { @@ -24,6 +23,7 @@ public final class AWSConfigs { public static final String REGION = "aws.region"; public static final String AMAZON_AWS_HOST = "aws.host"; public static final String AMAZON_AWS_SNS_2_SQS_QUEUE_URL = "aws.sns.2.sqs.queue.url"; + public static final String PROTOCOL = "aws.protocol"; private AWSConfigs() { } diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/AWSSQSClient.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/AWSSQSClient.java index 161fc3e..cb4539d 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/AWSSQSClient.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/AWSSQSClient.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.function.Predicate; import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; @@ -31,7 +30,6 @@ import com.amazonaws.services.sqs.model.ReceiveMessageResult; import com.amazonaws.services.sqs.model.SendMessageRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.localstack.LocalStackContainer; public class AWSSQSClient { private static final Logger LOG = LoggerFactory.getLogger(AWSSQSClient.class); @@ -40,13 +38,8 @@ public class AWSSQSClient { private int maxWaitTime = 10; private int maxNumberOfMessages = 1; - public AWSSQSClient(LocalStackContainer localStackContainer) { - sqs = AmazonSQSClientBuilder - .standard() - .withEndpointConfiguration(localStackContainer - .getEndpointConfiguration(LocalStackContainer.Service.SQS)) - .withCredentials(localStackContainer.getDefaultCredentialsProvider()) - .build(); + public AWSSQSClient(AmazonSQS sqs) { + this.sqs = sqs; } public String getQueue(String queue) { diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/TestAWSCredentialsProvider.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/TestAWSCredentialsProvider.java new file mode 100644 index 0000000..e348f40 --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/TestAWSCredentialsProvider.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.clients.aws.sqs; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; + +public class TestAWSCredentialsProvider implements AWSCredentialsProvider { + private static class TestAWSCredentials implements AWSCredentials { + private final String accessKey; + private final String secretKey; + + + public TestAWSCredentials() { + this(System.getProperty(AWSConfigs.ACCESS_KEY), System.getProperty(AWSConfigs.SECRET_KEY)); + } + + public TestAWSCredentials(String accessKey, String secretKey) { + this.accessKey = accessKey; + this.secretKey = secretKey; + } + + @Override + public String getAWSAccessKeyId() { + return accessKey; + } + + @Override + public String getAWSSecretKey() { + return secretKey; + } + }; + + private AWSCredentials credentials; + + public TestAWSCredentialsProvider() { + credentials = new TestAWSCredentials(); + } + + + public TestAWSCredentialsProvider(String accessKey, String secretKey) { + credentials = new TestAWSCredentials(accessKey, secretKey); + } + + @Override + public AWSCredentials getCredentials() { + return credentials; + } + + @Override + public void refresh() { + + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSClientUtils.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSClientUtils.java new file mode 100644 index 0000000..625bc04 --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSClientUtils.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.aws; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.AmazonSNSClientBuilder; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; +import org.apache.camel.kafkaconnector.clients.aws.sqs.TestAWSCredentialsProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class AWSClientUtils { + private static final Logger LOG = LoggerFactory.getLogger(AWSClientUtils.class); + + private AWSClientUtils() { + } + + private static String getRegion() { + String regionStr = System.getProperty(AWSConfigs.REGION); + String region; + + if (regionStr != null && !regionStr.isEmpty()) { + region = Regions.valueOf(regionStr).getName(); + } else { + region = Regions.US_EAST_1.getName(); + } + + return region; + } + + + public static AmazonSNS newSNSClient() { + LOG.debug("Creating a custom SNS client for running a AWS SNS test"); + AmazonSNSClientBuilder clientBuilder = AmazonSNSClientBuilder + .standard(); + + String awsInstanceType = System.getProperty("aws-service.instance.type"); + String region = getRegion(); + + if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) { + String amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST); + + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setProtocol(Protocol.HTTP); + + clientBuilder + .withClientConfiguration(clientConfiguration) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region)) + .withCredentials(new TestAWSCredentialsProvider("accesskey", "secretkey")); + } else { + clientBuilder + .withRegion(region) + .withCredentials(new TestAWSCredentialsProvider()); + } + + return clientBuilder.build(); + } + + public static AmazonSQS newSQSClient() { + LOG.debug("Creating a custom SQS client for running a AWS SNS test"); + AmazonSQSClientBuilder clientBuilder = AmazonSQSClientBuilder + .standard(); + + String awsInstanceType = System.getProperty("aws-service.instance.type"); + String region = getRegion(); + + if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) { + String amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST); + + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setProtocol(Protocol.HTTP); + + clientBuilder + .withClientConfiguration(clientConfiguration) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region)) + .withCredentials(new TestAWSCredentialsProvider("accesskey", "secretkey")); + } else { + clientBuilder + .withRegion(region) + .withCredentials(new TestAWSCredentialsProvider()); + } + + + + return clientBuilder.build(); + } + + public static AmazonS3 newS3Client() { + LOG.debug("Creating a new S3 client"); + AmazonS3ClientBuilder clientBuilder = AmazonS3ClientBuilder.standard(); + + String awsInstanceType = System.getProperty("aws-service.instance.type"); + String region = getRegion(); + + if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) { + String amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST); + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setProtocol(Protocol.HTTP); + + clientBuilder + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region)) + .withClientConfiguration(clientConfiguration) + .withCredentials(new TestAWSCredentialsProvider("accesskey", "secretkey")); + } else { + clientBuilder + .withRegion(region) + .withCredentials(new TestAWSCredentialsProvider()); + } + + clientBuilder + .withPathStyleAccessEnabled(true); + + return clientBuilder.build(); + } + + public static AmazonKinesis newKinesisClient() { + LOG.debug("Creating a new AWS Kinesis client"); + AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); + + String awsInstanceType = System.getProperty("aws-service.kinesis.instance.type"); + String region = getRegion(); + + if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) { + String amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST); + + LOG.debug("Creating a new AWS Kinesis client to access {}", amazonHost); + + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setProtocol(Protocol.HTTP); + + clientBuilder + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region)) + .withClientConfiguration(clientConfiguration) + .withCredentials(new TestAWSCredentialsProvider("accesskey", "secretkey")); + } else { + clientBuilder + .withRegion(region) + .withCredentials(new TestAWSCredentialsProvider()); + } + + return clientBuilder.build(); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSKinesisLocalContainerService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSKinesisLocalContainerService.java new file mode 100644 index 0000000..6a5353b --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSKinesisLocalContainerService.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.aws; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import org.testcontainers.containers.localstack.LocalStackContainer; + +public class AWSKinesisLocalContainerService extends AWSLocalContainerService<AmazonKinesis> { + + public AWSKinesisLocalContainerService() { + super(LocalStackContainer.Service.KINESIS); + } + + @Override + public String getServiceEndpoint() { + return super.getServiceEndpoint(LocalStackContainer.Service.KINESIS); + } + + @Override + public String getAmazonHost() { + final int kinesisPort = 4568; + + return "localhost:" + getContainer().getMappedPort(kinesisPort); + } + + + @Override + public AmazonKinesis getClient() { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setProtocol(Protocol.HTTP); + + return AmazonKinesisClientBuilder + .standard() + .withEndpointConfiguration(getContainer().getEndpointConfiguration(LocalStackContainer.Service.KINESIS)) + .withCredentials(getContainer().getDefaultCredentialsProvider()) + .withClientConfiguration(clientConfiguration) + .build(); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSLocalContainerService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSLocalContainerService.java new file mode 100644 index 0000000..596585b --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSLocalContainerService.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.aws; + +import java.util.Properties; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.regions.Regions; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.localstack.LocalStackContainer; + +abstract class AWSLocalContainerService<T> implements AWSService<T> { + private static final Logger LOG = LoggerFactory.getLogger(AWSLocalContainerService.class); + private final LocalStackContainer container; + + public AWSLocalContainerService(LocalStackContainer.Service...services) { + this.container = new LocalStackContainer().withServices(services); + + container.start(); + } + + + protected abstract String getAmazonHost(); + + protected abstract String getServiceEndpoint(); + + @Override + public void initialize() { + LOG.info("AWS service running at address {}", getServiceEndpoint()); + } + + @Override + public void shutdown() { + LOG.info("Stopping local AWS service"); + container.stop(); + } + + @Override + public AWSCredentials getCredentials() { + return container.getDefaultCredentialsProvider().getCredentials(); + } + + + @Override + public Properties getConnectionProperties() { + Properties properties = new Properties(); + + AWSCredentials credentials = getCredentials(); + + properties.put(AWSConfigs.ACCESS_KEY, credentials.getAWSAccessKeyId()); + + properties.put(AWSConfigs.SECRET_KEY, credentials.getAWSSecretKey()); + + properties.put(AWSConfigs.REGION, Regions.US_EAST_1.name()); + + properties.put(AWSConfigs.AMAZON_AWS_HOST, getAmazonHost()); + + /** + * We need to set this one. For some sets, when they instantiate the clients within + * Camel, they need to know what is the Amazon host being used (ie.: when creating them + * using the withEndpointConfiguration()). Because this happens within Camel, there's + * no way to pass that information easily. Therefore, the host is set as a property + * and read by whatever class/method creates the clients to pass to Camel. + * + * Do not unset. + */ + System.setProperty(AWSConfigs.AMAZON_AWS_HOST, getAmazonHost()); + + properties.put(AWSConfigs.PROTOCOL, "http"); + + return properties; + } + + protected LocalStackContainer getContainer() { + return container; + } + + protected String getAmazonHost(int port) { + return "localhost:" + container.getMappedPort(port); + } + + protected String getServiceEndpoint(LocalStackContainer.Service service) { + return container + .getEndpointConfiguration(service) + .getServiceEndpoint(); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSRemoteService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSRemoteService.java new file mode 100644 index 0000000..2a4c23a --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSRemoteService.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.aws; + +import java.util.Properties; +import java.util.function.Supplier; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.sqs.AmazonSQS; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; +import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient; +import org.apache.camel.kafkaconnector.clients.aws.sqs.TestAWSCredentialsProvider; + +public class AWSRemoteService<T> implements AWSService<T> { + private static final AWSCredentialsProvider CREDENTIALS_PROVIDER = new TestAWSCredentialsProvider(); + private Supplier<T> remoteClientSupplier; + + public AWSRemoteService(Supplier<T> remoteClientSupplier) { + this.remoteClientSupplier = remoteClientSupplier; + } + + + @Override + public T getClient() { + return remoteClientSupplier.get(); + } + + @Override + public AWSCredentials getCredentials() { + return CREDENTIALS_PROVIDER.getCredentials(); + } + + @Override + public Properties getConnectionProperties() { + Properties properties = new Properties(); + + AWSCredentials credentials = getCredentials(); + + properties.put(AWSConfigs.ACCESS_KEY, credentials.getAWSAccessKeyId()); + properties.put(AWSConfigs.SECRET_KEY, credentials.getAWSSecretKey()); + properties.put(AWSConfigs.REGION, Regions.US_EAST_1.name()); + + return properties; + } + + @Override + public void initialize() { + + } + + @Override + public void shutdown() { + + } + + public static AWSSQSClient newSQSClient() { + AmazonSQS sqs = AWSClientUtils.newSQSClient(); + + return new AWSSQSClient(sqs); + } + + +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSS3LocalContainerService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSS3LocalContainerService.java new file mode 100644 index 0000000..ab7679b --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSS3LocalContainerService.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.aws; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import org.testcontainers.containers.localstack.LocalStackContainer; + +public class AWSS3LocalContainerService extends AWSLocalContainerService<AmazonS3> { + + public AWSS3LocalContainerService() { + super(LocalStackContainer.Service.S3); + } + + + @Override + public String getServiceEndpoint() { + return super.getServiceEndpoint(LocalStackContainer.Service.S3); + } + + @Override + public String getAmazonHost() { + final int s3Port = 4572; + + return "localhost:" + getContainer().getMappedPort(s3Port); + } + + @Override + public AmazonS3 getClient() { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setProtocol(Protocol.HTTP); + + return AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration(getContainer().getEndpointConfiguration(LocalStackContainer.Service.S3)) + .withCredentials(getContainer().getDefaultCredentialsProvider()) + .withClientConfiguration(clientConfiguration) + .build(); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSNSLocalContainerService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSNSLocalContainerService.java new file mode 100644 index 0000000..8f1b8e0 --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSNSLocalContainerService.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.aws; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient; +import org.testcontainers.containers.localstack.LocalStackContainer; + +public class AWSSNSLocalContainerService extends AWSLocalContainerService<AWSSQSClient> { + + public AWSSNSLocalContainerService() { + super(LocalStackContainer.Service.SQS, + LocalStackContainer.Service.SNS); + } + + @Override + public String getServiceEndpoint() { + return super.getServiceEndpoint(LocalStackContainer.Service.SNS); + } + + @Override + public String getAmazonHost() { + final int snsPort = 4575; + + return super.getAmazonHost(snsPort); + } + + + @Override + public AWSSQSClient getClient() { + AmazonSQS sqs = AmazonSQSClientBuilder + .standard() + .withEndpointConfiguration(getContainer() + .getEndpointConfiguration(LocalStackContainer.Service.SQS)) + .withCredentials(getContainer().getDefaultCredentialsProvider()) + .build(); + + return new AWSSQSClient(sqs); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSQSLocalContainerService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSQSLocalContainerService.java new file mode 100644 index 0000000..1e7ffd9 --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSQSLocalContainerService.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.aws; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient; +import org.testcontainers.containers.localstack.LocalStackContainer; + +public class AWSSQSLocalContainerService extends AWSLocalContainerService<AWSSQSClient> { + + public AWSSQSLocalContainerService() { + super(LocalStackContainer.Service.SQS); + } + + @Override + public String getServiceEndpoint() { + return super.getServiceEndpoint(LocalStackContainer.Service.SQS); + } + + @Override + public String getAmazonHost() { + final int sqsPort = 4576; + + return super.getAmazonHost(sqsPort); + } + + @Override + public AWSSQSClient getClient() { + AmazonSQS sqs = AmazonSQSClientBuilder + .standard() + .withEndpointConfiguration(getContainer() + .getEndpointConfiguration(LocalStackContainer.Service.SQS)) + .withCredentials(getContainer().getDefaultCredentialsProvider()) + .build(); + + return new AWSSQSClient(sqs); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSService.java new file mode 100644 index 0000000..ffff052 --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSService.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.aws; + +import java.util.Properties; + +import com.amazonaws.auth.AWSCredentials; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public interface AWSService<T> extends BeforeAllCallback, AfterAllCallback { + + T getClient(); + + AWSCredentials getCredentials(); + + Properties getConnectionProperties(); + + /** + * Perform any initialization necessary + */ + void initialize(); + + /** + * Shuts down the service after the test has completed + */ + void shutdown(); + + + @Override + default void beforeAll(ExtensionContext extensionContext) throws Exception { + initialize(); + } + + @Override + default void afterAll(ExtensionContext extensionContext) throws Exception { + shutdown(); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSServiceFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSServiceFactory.java new file mode 100644 index 0000000..ea12594 --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSServiceFactory.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.aws; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class AWSServiceFactory { + private static final Logger LOG = LoggerFactory.getLogger(AWSServiceFactory.class); + + private AWSServiceFactory() { + } + + public static AWSService createSQSService() { + String awsInstanceType = System.getProperty("aws-service.instance.type"); + LOG.info("Creating a {} AWS SQS instance", awsInstanceType); + + if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) { + return new AWSSQSLocalContainerService(); + } + + if (awsInstanceType.equals("remote")) { + return new AWSRemoteService(AWSRemoteService::newSQSClient); + } + + LOG.error("Invalid AWS instance type: {}. Must be either 'remote' or 'local-aws-container'", + awsInstanceType); + throw new UnsupportedOperationException("Invalid AWS instance type"); + } + + public static AWSService createSNSService() { + String awsInstanceType = System.getProperty("aws-service.instance.type"); + LOG.info("Creating a {} AWS SNS instance", awsInstanceType); + + if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) { + return new AWSSNSLocalContainerService(); + } + + if (awsInstanceType.equals("remote")) { + return new AWSRemoteService(AWSRemoteService::newSQSClient); + } + + LOG.error("Invalid AWS instance type: {}. Must be either 'remote' or 'local-aws-container'", + awsInstanceType); + throw new UnsupportedOperationException("Invalid AWS instance type"); + + } + + public static AWSService createKinesisService() { + String awsInstanceType = System.getProperty("aws-service.kinesis.instance.type"); + LOG.info("Creating a {} AWS kinesis instance", awsInstanceType); + + if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) { + return new AWSKinesisLocalContainerService(); + } + + if (awsInstanceType.equals("remote")) { + return new AWSRemoteService(AWSClientUtils::newKinesisClient); + } + + LOG.error("Invalid AWS instance type: {}. Must be either 'remote' or 'local-aws-container'", + awsInstanceType); + throw new UnsupportedOperationException("Invalid AWS instance type"); + } + + public static AWSService createS3Service() { + String awsInstanceType = System.getProperty("aws-service.instance.type"); + LOG.info("Creating a {} AWS S3 instance", awsInstanceType); + + if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) { + return new AWSS3LocalContainerService(); + } + + if (awsInstanceType.equals("remote")) { + return new AWSRemoteService(AWSClientUtils::newS3Client); + } + + LOG.error("Invalid AWS instance type: {}. Must be either 'remote' or 'local-aws-container'", + awsInstanceType); + throw new UnsupportedOperationException("Invalid AWS instance type"); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelAWSSNSPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelAWSSNSPropertyFactory.java index 389ad98..d801d7d 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelAWSSNSPropertyFactory.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelAWSSNSPropertyFactory.java @@ -19,8 +19,8 @@ package org.apache.camel.kafkaconnector.sink.aws.sns; import java.util.Properties; -import org.apache.camel.kafkaconnector.AWSConfigs; import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; import org.apache.kafka.connect.runtime.ConnectorConfig; /** diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelSinkAWSSNSITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelSinkAWSSNSITCase.java index 9495d96..d486c75 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelSinkAWSSNSITCase.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelSinkAWSSNSITCase.java @@ -25,20 +25,20 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.amazonaws.services.sqs.model.Message; -import org.apache.camel.kafkaconnector.AWSConfigs; import org.apache.camel.kafkaconnector.AbstractKafkaTest; import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.ContainerUtil; import org.apache.camel.kafkaconnector.TestCommon; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient; import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.services.aws.AWSService; +import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -46,12 +46,10 @@ import static org.junit.jupiter.api.Assertions.fail; @Testcontainers public class CamelSinkAWSSNSITCase extends AbstractKafkaTest { - private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSNSITCase.class); - private static final int SNS_PORT = 4575; + @RegisterExtension + public static AWSService<AWSSQSClient> service = AWSServiceFactory.createSNSService(); - @Container - public LocalStackContainer localStackContainer = new LocalStackContainer() - .withServices(LocalStackContainer.Service.SQS, LocalStackContainer.Service.SNS); + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSNSITCase.class); private AWSSQSClient awsSqsClient; @@ -60,20 +58,7 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - final String sqsInstance = localStackContainer - .getEndpointConfiguration(LocalStackContainer.Service.SQS) - .getServiceEndpoint(); - - LOG.info("SQS instance running at {}", sqsInstance); - - awsSqsClient = new AWSSQSClient(localStackContainer); - - - final String snsInstance = localStackContainer - .getEndpointConfiguration(LocalStackContainer.Service.SNS) - .getServiceEndpoint(); - - LOG.info("SNS instance running at {}", snsInstance); + awsSqsClient = service.getClient(); } private boolean checkMessages(List<Message> messages) { @@ -110,7 +95,7 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest { final String sqsQueue = awsSqsClient.getQueue(TestCommon.DEFAULT_SQS_QUEUE_FOR_SNS); LOG.info("Created SQS queue {}", sqsQueue); - Properties properties = ContainerUtil.setupAWSConfigs(localStackContainer, SNS_PORT); + Properties properties = service.getConnectionProperties(); properties.put(AWSConfigs.AMAZON_AWS_SNS_2_SQS_QUEUE_URL, sqsQueue); ConnectorPropertyFactory testProperties = new CamelAWSSNSPropertyFactory(1, diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/TestSNSConfiguration.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/TestSNSConfiguration.java index dc2fa49..11ded50 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/TestSNSConfiguration.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/TestSNSConfiguration.java @@ -17,84 +17,20 @@ package org.apache.camel.kafkaconnector.sink.aws.sns; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.regions.Regions; import com.amazonaws.services.sns.AmazonSNS; -import com.amazonaws.services.sns.AmazonSNSClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import org.apache.camel.component.aws.sns.SnsConfiguration; -import org.apache.camel.kafkaconnector.AWSConfigs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.camel.kafkaconnector.services.aws.AWSClientUtils; public class TestSNSConfiguration extends SnsConfiguration { - private static final Logger LOG = LoggerFactory.getLogger(TestSNSConfiguration.class); - private final String amazonHost; - private final String region; - - private class TestAWSCredentialsProvider implements AWSCredentialsProvider { - @Override - public AWSCredentials getCredentials() { - return new AWSCredentials() { - @Override - public String getAWSAccessKeyId() { - return System.getProperty(AWSConfigs.ACCESS_KEY); - } - - @Override - public String getAWSSecretKey() { - return System.getProperty(AWSConfigs.SECRET_KEY); - } - }; - } - - @Override - public void refresh() { - - } - } - - public TestSNSConfiguration() { - amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST); - region = Regions.valueOf(System.getProperty(AWSConfigs.REGION)).getName(); - } @Override public AmazonSNS getAmazonSNSClient() { - LOG.debug("Creating a custom SNS client for running a AWS SNS test"); - AmazonSNSClientBuilder clientBuilder = AmazonSNSClientBuilder - .standard(); - - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setProtocol(Protocol.HTTP); - - clientBuilder - .withClientConfiguration(clientConfiguration) - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region)) - .withCredentials(new TestAWSCredentialsProvider()); - - return clientBuilder.build(); + return AWSClientUtils.newSNSClient(); } @Override public AmazonSQS getAmazonSQSClient() { - LOG.debug("Creating a custom SQS client for running a AWS SNS test"); - AmazonSQSClientBuilder clientBuilder = AmazonSQSClientBuilder - .standard(); - - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setProtocol(Protocol.HTTP); - - clientBuilder - .withClientConfiguration(clientConfiguration) - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region)) - .withCredentials(new TestAWSCredentialsProvider()); - - return clientBuilder.build(); + return AWSClientUtils.newSQSClient(); } } diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelAWSSQSPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelAWSSQSPropertyFactory.java index cad1fcf..d4b52c5 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelAWSSQSPropertyFactory.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelAWSSQSPropertyFactory.java @@ -19,8 +19,9 @@ package org.apache.camel.kafkaconnector.sink.aws.sqs; import java.util.Properties; -import org.apache.camel.kafkaconnector.AWSConfigs; +import com.amazonaws.regions.Regions; import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; import org.apache.kafka.connect.runtime.ConnectorConfig; /** @@ -43,6 +44,7 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory { @Override public Properties getProperties() { Properties connectorProps = new Properties(); + connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAWSSQSSinkConnector"); connectorProps.put("tasks.max", String.valueOf(tasksMax)); @@ -50,8 +52,24 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory { connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); - String queueUrl = "aws-sqs://" + queue + "?autoCreateQueue=true&accessKey=accesskey&secretKey=secretKey®ion=EU_WEST_1&protocol=http&amazonAWSHost=" - + amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST, "localhost"); + String accessKey = amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""); + String secretKey = amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""); + + String region = amazonConfigs.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()); + + String queueUrl = String.format("aws-sqs://%s?autoCreateQueue=true&accessKey=%s&secretKey=%s®ion=%s", + queue, accessKey, secretKey, region); + + String protocol = amazonConfigs.getProperty(AWSConfigs.PROTOCOL); + if (protocol != null && !protocol.isEmpty()) { + queueUrl = String.format("%s&protocol=%s", queueUrl, protocol); + } + + String amazonAWSHost = amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST); + if (amazonAWSHost != null && !amazonAWSHost.isEmpty()) { + queueUrl = String.format("%s&amazonAWSHost=%s", queueUrl, amazonAWSHost); + } + connectorProps.put("camel.sink.url", queueUrl); connectorProps.put("topics", topic); @@ -61,8 +79,7 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory { connectorProps.put("camel.component.aws-sqs.configuration.secret-key", amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, "")); - connectorProps.put("camel.component.aws-sqs.configuration.region", - amazonConfigs.getProperty(AWSConfigs.REGION, "")); + connectorProps.put("camel.component.aws-sqs.configuration.region", region); return connectorProps; } diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java index 314690e..2a4946d 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java @@ -27,17 +27,18 @@ import java.util.concurrent.TimeUnit; import com.amazonaws.services.sqs.model.Message; import org.apache.camel.kafkaconnector.AbstractKafkaTest; import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.ContainerUtil; import org.apache.camel.kafkaconnector.TestCommon; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient; import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.services.aws.AWSService; +import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.jupiter.api.Assertions.fail; @@ -45,12 +46,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @Testcontainers public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { - private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class); - private static final int SQS_PORT = 4576; + @RegisterExtension + public static AWSService<AWSSQSClient> awsService = AWSServiceFactory.createSQSService(); - @Container - public LocalStackContainer localStackContainer = new LocalStackContainer() - .withServices(LocalStackContainer.Service.SQS); + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class); private AWSSQSClient awssqsClient; @@ -59,13 +58,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - final String sqsInstance = localStackContainer - .getEndpointConfiguration(LocalStackContainer.Service.SQS) - .getServiceEndpoint(); - - LOG.info("SQS instance running at {}", sqsInstance); - - awssqsClient = new AWSSQSClient(localStackContainer); + awssqsClient = awsService.getClient(); } private boolean checkMessages(List<Message> messages) { @@ -111,7 +104,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { @Timeout(value = 120) public void testBasicSendReceive() { try { - Properties properties = ContainerUtil.setupAWSConfigs(localStackContainer, SQS_PORT); + Properties properties = awsService.getConnectionProperties(); ConnectorPropertyFactory testProperties = new CamelAWSSQSPropertyFactory(1, TestCommon.getDefaultTestTopic(this.getClass()), TestCommon.DEFAULT_SQS_QUEUE, properties); diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelAWSKinesisPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelAWSKinesisPropertyFactory.java index df7559b..bea1d09 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelAWSKinesisPropertyFactory.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelAWSKinesisPropertyFactory.java @@ -19,8 +19,8 @@ package org.apache.camel.kafkaconnector.source.aws.kinesis; import java.util.Properties; -import org.apache.camel.kafkaconnector.AWSConfigs; import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; import org.apache.kafka.connect.runtime.ConnectorConfig; diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelSourceAWSKinesisITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelSourceAWSKinesisITCase.java index 61e5a9b..e75acb4 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelSourceAWSKinesisITCase.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelSourceAWSKinesisITCase.java @@ -25,27 +25,24 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.amazonaws.AmazonServiceException; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.amazonaws.services.kinesis.model.CreateStreamResult; import com.amazonaws.services.kinesis.model.PutRecordsRequest; import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; import com.amazonaws.services.kinesis.model.PutRecordsResult; import org.apache.camel.kafkaconnector.AbstractKafkaTest; import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.ContainerUtil; import org.apache.camel.kafkaconnector.TestCommon; import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.services.aws.AWSService; +import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.Assert.fail; @@ -53,12 +50,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Testcontainers public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { + @RegisterExtension + public static AWSService<AmazonKinesis> service = AWSServiceFactory.createKinesisService(); + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class); - private static final int KINESIS_PORT = 4568; - - @Container - public LocalStackContainer localStackContainer = new LocalStackContainer() - .withServices(LocalStackContainer.Service.KINESIS); private AmazonKinesis awsKinesisClient; @@ -68,24 +63,7 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - if (!localStackContainer.isRunning()) { - LOG.info("Kinesis is not running"); - } - final String kinesisInstance = localStackContainer - .getEndpointConfiguration(LocalStackContainer.Service.KINESIS) - .getServiceEndpoint(); - - LOG.info("Kinesis instance running at {}", kinesisInstance); - - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setProtocol(Protocol.HTTP); - - awsKinesisClient = AmazonKinesisClientBuilder - .standard() - .withEndpointConfiguration(localStackContainer.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)) - .withCredentials(localStackContainer.getDefaultCredentialsProvider()) - .withClientConfiguration(clientConfiguration) - .build(); + awsKinesisClient = service.getClient(); } private boolean checkRecord(ConsumerRecord<String, String> record) { @@ -102,7 +80,7 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { @Test @Timeout(120) public void testBasicSendReceive() throws ExecutionException, InterruptedException { - Properties properties = ContainerUtil.setupAWSConfigs(localStackContainer, KINESIS_PORT); + Properties properties = service.getConnectionProperties(); ConnectorPropertyFactory testProperties = new CamelAWSKinesisPropertyFactory(1, TestCommon.getDefaultTestTopic(this.getClass()), TestCommon.DEFAULT_KINESIS_STREAM, properties); diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/TestKinesisConfiguration.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/TestKinesisConfiguration.java index 26939c8..b2a97be 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/TestKinesisConfiguration.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/TestKinesisConfiguration.java @@ -17,64 +17,16 @@ package org.apache.camel.kafkaconnector.source.aws.kinesis; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import org.apache.camel.component.aws.kinesis.KinesisConfiguration; -import org.apache.camel.kafkaconnector.AWSConfigs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.camel.kafkaconnector.services.aws.AWSClientUtils; public class TestKinesisConfiguration extends KinesisConfiguration { - private static final Logger LOG = LoggerFactory.getLogger(TestKinesisConfiguration.class); - private final String amazonHost; - private final String region; - private AmazonKinesis amazonKinesis; - private class TestAWSCredentialsProvider implements AWSCredentialsProvider { - @Override - public AWSCredentials getCredentials() { - return new AWSCredentials() { - @Override - public String getAWSAccessKeyId() { - return System.getProperty(AWSConfigs.ACCESS_KEY); - } - - @Override - public String getAWSSecretKey() { - return System.getProperty(AWSConfigs.SECRET_KEY); - } - }; - } - - @Override - public void refresh() { - - } - } - - public TestKinesisConfiguration() { - amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST); - region = Regions.valueOf(System.getProperty(AWSConfigs.REGION)).getName(); - } private AmazonKinesis buildClient() { - LOG.debug("Creating a new AWS Kinesis client"); - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setProtocol(Protocol.HTTP); - - return AmazonKinesisClientBuilder - .standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region)) - .withCredentials(new TestAWSCredentialsProvider()) - .withClientConfiguration(clientConfiguration) - .build(); + return AWSClientUtils.newKinesisClient(); } @Override diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java index 28fda93..c1ecc67 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java @@ -19,8 +19,8 @@ package org.apache.camel.kafkaconnector.source.aws.s3; import java.util.Properties; -import org.apache.camel.kafkaconnector.AWSConfigs; import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; import org.apache.kafka.connect.runtime.ConnectorConfig; diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelSourceAWSS3ITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelSourceAWSS3ITCase.java index 74164b9..afdaee5 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelSourceAWSS3ITCase.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelSourceAWSS3ITCase.java @@ -21,35 +21,30 @@ import java.io.File; import java.util.Properties; import java.util.concurrent.ExecutionException; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; import org.apache.camel.kafkaconnector.AbstractKafkaTest; import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.ContainerUtil; import org.apache.camel.kafkaconnector.TestCommon; import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.services.aws.AWSService; +import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.jupiter.api.Assertions.assertEquals; @Testcontainers public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class); - private static final int S3_PORT = 4572; + @RegisterExtension + public static AWSService<AmazonS3> service = AWSServiceFactory.createS3Service(); - @Container - public LocalStackContainer localStackContainer = new LocalStackContainer() - .withServices(LocalStackContainer.Service.S3); + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class); private AmazonS3 awsS3Client; @@ -58,23 +53,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - final String s3Instance = localStackContainer - .getEndpointConfiguration(LocalStackContainer.Service.S3) - .getServiceEndpoint(); - - LOG.info("S3 instance running at {}", s3Instance); - - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setProtocol(Protocol.HTTP); - - awsS3Client = AmazonS3ClientBuilder - .standard() - .withEndpointConfiguration(localStackContainer.getEndpointConfiguration(LocalStackContainer.Service.S3)) - .withCredentials(localStackContainer.getDefaultCredentialsProvider()) - .withClientConfiguration(clientConfiguration) - - .build(); - + awsS3Client = service.getClient(); } private boolean checkRecord(ConsumerRecord<String, String> record) { @@ -91,7 +70,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { @Test @Timeout(180) public void testBasicSendReceive() throws ExecutionException, InterruptedException { - Properties properties = ContainerUtil.setupAWSConfigs(localStackContainer, S3_PORT); + Properties properties = service.getConnectionProperties(); ConnectorPropertyFactory testProperties = new CamelAWSS3PropertyFactory(1, TestCommon.getDefaultTestTopic(this.getClass()), TestCommon.DEFAULT_S3_BUCKET, properties); diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/TestS3Configuration.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/TestS3Configuration.java index 05779ff..e2e78bb 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/TestS3Configuration.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/TestS3Configuration.java @@ -17,65 +17,16 @@ package org.apache.camel.kafkaconnector.source.aws.s3; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; import org.apache.camel.component.aws.s3.S3Configuration; -import org.apache.camel.kafkaconnector.AWSConfigs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.camel.kafkaconnector.services.aws.AWSClientUtils; public class TestS3Configuration extends S3Configuration { - private static final Logger LOG = LoggerFactory.getLogger(TestS3Configuration.class); - private final String amazonHost; - private final String region; - private AmazonS3 amazonS3; - private class TestAWSCredentialsProvider implements AWSCredentialsProvider { - @Override - public AWSCredentials getCredentials() { - return new AWSCredentials() { - @Override - public String getAWSAccessKeyId() { - return System.getProperty(AWSConfigs.ACCESS_KEY); - } - - @Override - public String getAWSSecretKey() { - return System.getProperty(AWSConfigs.SECRET_KEY); - } - }; - } - - @Override - public void refresh() { - - } - } - - public TestS3Configuration() { - amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST); - region = Regions.valueOf(System.getProperty(AWSConfigs.REGION)).getName(); - } private AmazonS3 buildClient() { - LOG.debug("Creating a new S3 client"); - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setProtocol(Protocol.HTTP); - - return AmazonS3ClientBuilder - .standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region)) - .withCredentials(new TestAWSCredentialsProvider()) - .withClientConfiguration(clientConfiguration) - .withPathStyleAccessEnabled(true) - .build(); + return AWSClientUtils.newS3Client(); } @Override diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelAWSSQSPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelAWSSQSPropertyFactory.java index 9b374a1..5cb5b0f 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelAWSSQSPropertyFactory.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelAWSSQSPropertyFactory.java @@ -19,8 +19,9 @@ package org.apache.camel.kafkaconnector.source.aws.sqs; import java.util.Properties; -import org.apache.camel.kafkaconnector.AWSConfigs; +import com.amazonaws.regions.Regions; import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; import org.apache.kafka.connect.runtime.ConnectorConfig; @@ -45,6 +46,7 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory { public Properties getProperties() { Properties connectorProps = new Properties(); connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAWSSQSSourceConnector"); + connectorProps.put("tasks.max", String.valueOf(tasksMax)); connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSourceConnector"); connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); @@ -52,10 +54,25 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory { connectorProps.put("camel.source.kafka.topic", topic); - String queueUrl = "aws-sqs://" + queue + "?autoCreateQueue=true&accessKey=accesskey&secretKey=secretKey®ion=EU_WEST_1&protocol=http&amazonAWSHost=" - + amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST, "localhost"); - connectorProps.put("camel.source.url", queueUrl); + String accessKey = amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""); + String secretKey = amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""); + + String region = amazonConfigs.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()); + + String queueUrl = String.format("aws-sqs://%s?autoCreateQueue=true&accessKey=%s&secretKey=%s®ion=%s", + queue, accessKey, secretKey, region); + String protocol = amazonConfigs.getProperty(AWSConfigs.PROTOCOL); + if (protocol != null && !protocol.isEmpty()) { + queueUrl = String.format("%s&protocol=%s", queueUrl, protocol); + } + + String amazonAWSHost = amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST); + if (amazonAWSHost != null && !amazonAWSHost.isEmpty()) { + queueUrl = String.format("%s&amazonAWSHost=%s", queueUrl, amazonAWSHost); + } + + connectorProps.put("camel.source.url", queueUrl); connectorProps.put("camel.component.aws-sqs.configuration.access-key", amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, "")); diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelSourceAWSSQSITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelSourceAWSSQSITCase.java index 9d22cc5..c8e6772 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelSourceAWSSQSITCase.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelSourceAWSSQSITCase.java @@ -22,30 +22,29 @@ import java.util.concurrent.ExecutionException; import org.apache.camel.kafkaconnector.AbstractKafkaTest; import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.ContainerUtil; import org.apache.camel.kafkaconnector.TestCommon; +import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs; import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient; import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.services.aws.AWSService; +import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.jupiter.api.Assertions.assertEquals; @Testcontainers public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class); - private static final int SQS_PORT = 4576; + @RegisterExtension + public static AWSService<AWSSQSClient> service = AWSServiceFactory.createSQSService(); - @Container - public LocalStackContainer localStackContainer = new LocalStackContainer() - .withServices(LocalStackContainer.Service.SQS); + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class); private AWSSQSClient awssqsClient; @@ -54,13 +53,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - final String sqsInstance = localStackContainer - .getEndpointConfiguration(LocalStackContainer.Service.SQS) - .getServiceEndpoint(); - - LOG.info("SQS instance running at {}", sqsInstance); - - awssqsClient = new AWSSQSClient(localStackContainer); + awssqsClient = service.getClient(); } private boolean checkRecord(ConsumerRecord<String, String> record) { @@ -77,7 +70,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { @Test @Timeout(90) public void testBasicSendReceive() throws ExecutionException, InterruptedException { - Properties properties = ContainerUtil.setupAWSConfigs(localStackContainer, SQS_PORT); + Properties properties = service.getConnectionProperties(); ConnectorPropertyFactory testProperties = new CamelAWSSQSPropertyFactory(1, TestCommon.getDefaultTestTopic(this.getClass()), TestCommon.DEFAULT_SQS_QUEUE, @@ -87,7 +80,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { LOG.debug("Sending SQS messages"); for (int i = 0; i < expect; i++) { - awssqsClient.send(TestCommon.DEFAULT_SQS_QUEUE, "Test message " + i); + awssqsClient.send(TestCommon.DEFAULT_SQS_QUEUE, "Source test message " + i); } LOG.debug("Done sending SQS messages");