This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit bff8ad67adb9a8c93ab3b1ec349b741924ed2e67 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Thu Nov 19 09:34:41 2020 +0100 Updated AWS test services according to interface changes from camel 3.7 --- .../aws/v1/clients/AWSClientUtils.java | 89 ---------------------- .../source/CamelSourceAWSKinesisITCase.java | 5 +- .../aws/v1/s3/source/CamelSourceAWSS3ITCase.java | 67 +++++++++++++++- .../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java | 6 +- .../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java | 6 +- .../aws/v1/sqs/source/CamelSourceAWSSQSITCase.java | 6 +- .../aws/v2/clients/AWSSDKClientUtils.java | 67 ---------------- .../source/CamelSourceAWSKinesisITCase.java | 5 +- .../aws/v2/s3/source/CamelSourceAWSS3ITCase.java | 48 +++++++++++- .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java | 6 +- .../aws/v2/sqs/source/CamelSourceAWSSQSITCase.java | 6 +- 11 files changed, 128 insertions(+), 183 deletions(-) diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/clients/AWSClientUtils.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/clients/AWSClientUtils.java deleted file mode 100644 index 93399a8..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/clients/AWSClientUtils.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.clients; - -import java.util.Iterator; - -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ListVersionsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.amazonaws.services.s3.model.S3VersionSummary; -import com.amazonaws.services.s3.model.VersionListing; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class AWSClientUtils { - private static final Logger LOG = LoggerFactory.getLogger(AWSClientUtils.class); - - private AWSClientUtils() { - } - - /** - * Delete an S3 bucket using the provided client. Coming from AWS documentation: - * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java - * - * @param s3Client - * the AmazonS3 client instance used to delete the bucket - * @param bucketName - * a String containing the bucket name - */ - public static void deleteBucket(AmazonS3 s3Client, String bucketName) { - // Delete all objects from the bucket. This is sufficient - // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts - // delete markers for all objects, but doesn't delete the object versions. - // To delete objects from versioned buckets, delete all of the object versions before deleting - // the bucket (see below for an example). - ObjectListing objectListing = s3Client.listObjects(bucketName); - while (true) { - Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator(); - while (objIter.hasNext()) { - s3Client.deleteObject(bucketName, objIter.next().getKey()); - } - - // If the bucket contains many objects, the listObjects() call - // might not return all of the objects in the first listing. Check to - // see whether the listing was truncated. If so, retrieve the next page of objects - // and delete them. - if (objectListing.isTruncated()) { - objectListing = s3Client.listNextBatchOfObjects(objectListing); - } else { - break; - } - } - - // Delete all object versions (required for versioned buckets). - VersionListing versionList = s3Client.listVersions(new ListVersionsRequest().withBucketName(bucketName)); - while (true) { - Iterator<S3VersionSummary> versionIter = versionList.getVersionSummaries().iterator(); - while (versionIter.hasNext()) { - S3VersionSummary vs = versionIter.next(); - s3Client.deleteVersion(bucketName, vs.getKey(), vs.getVersionId()); - } - - if (versionList.isTruncated()) { - versionList = s3Client.listNextBatchOfVersions(versionList); - } else { - break; - } - } - - // After all objects and object versions are deleted, delete the bucket. - s3Client.deleteBucket(bucketName); - } -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java index 0dad306..d16f23c 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java @@ -37,6 +37,7 @@ import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.aws.clients.AWSClientUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.services.AWSService; import org.apache.camel.test.infra.aws.services.AWSServiceFactory; @@ -56,7 +57,7 @@ import static org.junit.jupiter.api.Assertions.fail; @Testcontainers public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { @RegisterExtension - public static AWSService<AmazonKinesis> service = AWSServiceFactory.createKinesisService(); + public static AWSService service = AWSServiceFactory.createKinesisService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class); @@ -124,7 +125,7 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { public void setUp() { streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + TestUtils.randomWithRange(0, 100); - awsKinesisClient = service.getClient(); + awsKinesisClient = AWSClientUtils.newKinesisClient(); received = 0; createStream(); diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java index 05e2319..21af556 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java @@ -18,16 +18,22 @@ package org.apache.camel.kafkaconnector.aws.v1.s3.source; import java.io.File; +import java.util.Iterator; import java.util.Properties; import java.util.concurrent.ExecutionException; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; -import org.apache.camel.kafkaconnector.aws.v1.clients.AWSClientUtils; +import com.amazonaws.services.s3.model.ListVersionsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.S3VersionSummary; +import com.amazonaws.services.s3.model.VersionListing; import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.aws.clients.AWSClientUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.AWSConfigs; import org.apache.camel.test.infra.aws.common.services.AWSService; @@ -49,7 +55,7 @@ import static org.junit.jupiter.api.Assertions.fail; public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { @RegisterExtension - public static AWSService<AmazonS3> service = AWSServiceFactory.createS3Service(); + public static AWSService service = AWSServiceFactory.createS3Service(); private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class); private AmazonS3 awsS3Client; @@ -57,6 +63,59 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { private volatile int received; private final int expect = 10; + /** + * Delete an S3 bucket using the provided client. Coming from AWS documentation: + * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java + * + * @param s3Client + * the AmazonS3 client instance used to delete the bucket + * @param bucketName + * a String containing the bucket name + */ + public static void deleteBucket(AmazonS3 s3Client, String bucketName) { + // Delete all objects from the bucket. This is sufficient + // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts + // delete markers for all objects, but doesn't delete the object versions. + // To delete objects from versioned buckets, delete all of the object versions before deleting + // the bucket (see below for an example). + ObjectListing objectListing = s3Client.listObjects(bucketName); + while (true) { + Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator(); + while (objIter.hasNext()) { + s3Client.deleteObject(bucketName, objIter.next().getKey()); + } + + // If the bucket contains many objects, the listObjects() call + // might not return all of the objects in the first listing. Check to + // see whether the listing was truncated. If so, retrieve the next page of objects + // and delete them. + if (objectListing.isTruncated()) { + objectListing = s3Client.listNextBatchOfObjects(objectListing); + } else { + break; + } + } + + // Delete all object versions (required for versioned buckets). + VersionListing versionList = s3Client.listVersions(new ListVersionsRequest().withBucketName(bucketName)); + while (true) { + Iterator<S3VersionSummary> versionIter = versionList.getVersionSummaries().iterator(); + while (versionIter.hasNext()) { + S3VersionSummary vs = versionIter.next(); + s3Client.deleteVersion(bucketName, vs.getKey(), vs.getVersionId()); + } + + if (versionList.isTruncated()) { + versionList = s3Client.listNextBatchOfVersions(versionList); + } else { + break; + } + } + + // After all objects and object versions are deleted, delete the bucket. + s3Client.deleteBucket(bucketName); + } + @Override protected String[] getConnectorsInTest() { return new String[] {"camel-aws-s3-kafka-connector"}; @@ -64,7 +123,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - awsS3Client = service.getClient(); + awsS3Client = AWSClientUtils.newS3Client(); received = 0; try { @@ -78,7 +137,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { @AfterEach public void tearDown() { try { - AWSClientUtils.deleteBucket(awsS3Client, AWSCommon.DEFAULT_S3_BUCKET); + deleteBucket(awsS3Client, AWSCommon.DEFAULT_S3_BUCKET); } catch (Exception e) { LOG.warn("Unable to delete bucked: {}", e.getMessage(), e); } diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java index 93a709c..4675a30 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java @@ -26,13 +26,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.amazonaws.regions.Regions; -import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.Message; import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient; import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.aws.clients.AWSClientUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.AWSConfigs; import org.apache.camel.test.infra.aws.common.services.AWSService; @@ -52,7 +52,7 @@ import static org.junit.jupiter.api.Assertions.fail; @Testcontainers public class CamelSinkAWSSNSITCase extends AbstractKafkaTest { @RegisterExtension - public static AWSService<AmazonSQS> service = AWSServiceFactory.createSNSService(); + public static AWSService service = AWSServiceFactory.createSNSService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSNSITCase.class); @@ -70,7 +70,7 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - awsSqsClient = new AWSSQSClient(service.getClient()); + awsSqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient()); queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000); sqsQueueUrl = awsSqsClient.getQueue(queueName); diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java index c595c67..b993ad4 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java @@ -25,13 +25,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.amazonaws.regions.Regions; -import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.Message; import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient; import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.aws.clients.AWSClientUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.AWSConfigs; import org.apache.camel.test.infra.aws.common.services.AWSService; @@ -53,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.fail; @Testcontainers public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { @RegisterExtension - public static AWSService<AmazonSQS> awsService = AWSServiceFactory.createSQSService(); + public static AWSService awsService = AWSServiceFactory.createSQSService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class); @@ -71,7 +71,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - awssqsClient = new AWSSQSClient(awsService.getClient()); + awssqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient()); queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000); queueUrl = awssqsClient.getQueue(queueName); diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java index 786042b..69d9b46 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java @@ -21,12 +21,12 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; import com.amazonaws.regions.Regions; -import com.amazonaws.services.sqs.AmazonSQS; import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient; import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.aws.clients.AWSClientUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.AWSConfigs; import org.apache.camel.test.infra.aws.common.services.AWSService; @@ -48,7 +48,7 @@ import static org.junit.jupiter.api.Assertions.fail; @Testcontainers public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { @RegisterExtension - public static AWSService<AmazonSQS> service = AWSServiceFactory.createSQSService(); + public static AWSService service = AWSServiceFactory.createSQSService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class); @@ -66,7 +66,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - awssqsClient = new AWSSQSClient(service.getClient()); + awssqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient()); queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000); queueUrl = awssqsClient.getQueue(queueName); diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSDKClientUtils.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSDKClientUtils.java deleted file mode 100644 index 5fd3339..0000000 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSDKClientUtils.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v2.clients; - -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; -import software.amazon.awssdk.services.s3.model.S3Object; - -public final class AWSSDKClientUtils { - private AWSSDKClientUtils() { - - } - - /** - * Delete an S3 bucket using the provided client. Coming from AWS documentation: - * https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html - * - * AWS SDK v1 doc for reference: - * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java - * @param s3Client the AmazonS3 client instance used to delete the bucket - * @param bucketName a String containing the bucket name - */ - public static void deleteBucket(S3Client s3Client, String bucketName) { - // Delete all objects from the bucket. This is sufficient - // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts - // delete markers for all objects, but doesn't delete the object versions. - // To delete objects from versioned buckets, delete all of the object versions before deleting - // the bucket (see below for an example). - ListObjectsV2Request listObjectsRequest = ListObjectsV2Request.builder() - .bucket(bucketName) - .build(); - - ListObjectsV2Response objectListing; - do { - objectListing = s3Client.listObjectsV2(listObjectsRequest); - - for (S3Object s3Object : objectListing.contents()) { - s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(s3Object.key()).build()); - } - - listObjectsRequest = ListObjectsV2Request.builder().bucket(bucketName) - .continuationToken(objectListing.nextContinuationToken()) - .build(); - } while (objectListing.isTruncated()); - - s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()); - } - -} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java index b5cca54..3c25d1b 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java @@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.services.AWSService; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.AfterEach; @@ -60,7 +61,7 @@ import static org.junit.jupiter.api.Assertions.fail; @Testcontainers public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { @RegisterExtension - public static AWSService<KinesisClient> awsService = AWSServiceFactory.createKinesisService(); + public static AWSService awsService = AWSServiceFactory.createKinesisService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class); @@ -154,7 +155,7 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { public void setUp() { streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + TestUtils.randomWithRange(0, 100); - kinesisClient = awsService.getClient(); + kinesisClient = AWSSDKClientUtils.newKinesisClient(); received = 0; createStream(); diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java index 43f1222..a02a6bd 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java @@ -22,7 +22,6 @@ import java.net.URL; import java.util.Properties; import java.util.concurrent.ExecutionException; -import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSDKClientUtils; import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; @@ -30,6 +29,7 @@ import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.AWSConfigs; import org.apache.camel.test.infra.aws.common.services.AWSService; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.AfterEach; @@ -43,7 +43,12 @@ import org.testcontainers.junit.jupiter.Testcontainers; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Object; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @@ -52,7 +57,7 @@ import static org.junit.jupiter.api.Assertions.fail; public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { @RegisterExtension - public static AWSService<S3Client> service = AWSServiceFactory.createS3Service(); + public static AWSService service = AWSServiceFactory.createS3Service(); private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class); private S3Client awsS3Client; @@ -66,9 +71,44 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { return new String[] {"camel-aws2-s3-kafka-connector"}; } + /** + * Delete an S3 bucket using the provided client. Coming from AWS documentation: + * https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html + * + * AWS SDK v1 doc for reference: + * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java + * @param s3Client the AmazonS3 client instance used to delete the bucket + * @param bucketName a String containing the bucket name + */ + private static void deleteBucket(S3Client s3Client, String bucketName) { + // Delete all objects from the bucket. This is sufficient + // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts + // delete markers for all objects, but doesn't delete the object versions. + // To delete objects from versioned buckets, delete all of the object versions before deleting + // the bucket (see below for an example). + ListObjectsV2Request listObjectsRequest = ListObjectsV2Request.builder() + .bucket(bucketName) + .build(); + + ListObjectsV2Response objectListing; + do { + objectListing = s3Client.listObjectsV2(listObjectsRequest); + + for (S3Object s3Object : objectListing.contents()) { + s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(s3Object.key()).build()); + } + + listObjectsRequest = ListObjectsV2Request.builder().bucket(bucketName) + .continuationToken(objectListing.nextContinuationToken()) + .build(); + } while (objectListing.isTruncated()); + + s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()); + } + @BeforeEach public void setUp() { - awsS3Client = service.getClient(); + awsS3Client = AWSSDKClientUtils.newS3Client(); received = 0; bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100); @@ -87,7 +127,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { @AfterEach public void tearDown() { try { - AWSSDKClientUtils.deleteBucket(awsS3Client, bucketName); + deleteBucket(awsS3Client, bucketName); } catch (Exception e) { LOG.warn("Unable to delete bucked: {}", e.getMessage(), e); } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java index 13daee7..b30deec 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java @@ -32,6 +32,7 @@ import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.AWSConfigs; import org.apache.camel.test.infra.aws.common.services.AWSService; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -44,7 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.junit.jupiter.Testcontainers; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.Message; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -53,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.fail; @Testcontainers public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { @RegisterExtension - public static AWSService<SqsClient> awsService = AWSServiceFactory.createSQSService(); + public static AWSService awsService = AWSServiceFactory.createSQSService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class); @@ -71,7 +71,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - awssqsClient = new AWSSQSClient(awsService.getClient()); + awssqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient()); queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000); String queueUrl = awssqsClient.getOrCreateQueue(queueName); diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java index c2e6539..a3f2111 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java @@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.AWSConfigs; import org.apache.camel.test.infra.aws.common.services.AWSService; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.AfterEach; @@ -39,14 +40,13 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sqs.SqsClient; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { @RegisterExtension - public static AWSService<SqsClient> service = AWSServiceFactory.createSQSService(); + public static AWSService service = AWSServiceFactory.createSQSService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class); @@ -63,7 +63,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - awssqsClient = new AWSSQSClient(service.getClient()); + awssqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient()); queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000); // TODO: this is a work-around for CAMEL-15833