This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 0a745841f2d6466be18ce183879478acd6c37916 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Mon Feb 8 13:50:51 2021 +0100 Converted AWS source tests to use the reusable source base class Includes: - AWS v2 Kinesis source - AWS v2 S3 source - AWS v2 SQS source --- .../source/CamelSourceAWSKinesisITCase.java | 54 ++----- .../kafkaconnector/aws/v2/s3/common/S3Utils.java | 32 ++++ .../aws/v2/s3/source/CamelSourceAWSS3ITCase.java | 164 +++++---------------- .../source/CamelSourceAWSS3LargeFilesITCase.java | 143 ++++++++++++++++++ .../aws/v2/sqs/source/CamelSourceAWSSQSITCase.java | 51 +++---- .../common/test/CamelSourceTestSupport.java | 18 ++- 6 files changed, 261 insertions(+), 201 deletions(-) diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java index e19d8bd..d9e5ac5 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java @@ -20,15 +20,14 @@ package org.apache.camel.kafkaconnector.aws.v2.kinesis.source; import java.util.concurrent.ExecutionException; import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.services.AWSService; import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,8 +35,6 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.KinesisClient; import static org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.createStream; @@ -47,16 +44,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @TestInstance(TestInstance.Lifecycle.PER_CLASS) @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") -public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { +public class CamelSourceAWSKinesisITCase extends CamelSourceTestSupport { @RegisterExtension public static AWSService awsService = AWSServiceFactory.createKinesisService(); - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class); private String streamName; private KinesisClient kinesisClient; + private String topicName; - private volatile int received; private final int expect = 10; @Override @@ -66,10 +62,10 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { + topicName = getTopicForTest(this); streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + TestUtils.randomWithRange(0, 100); kinesisClient = AWSSDKClientUtils.newKinesisClient(); - received = 0; createStream(kinesisClient, streamName); } @@ -80,45 +76,28 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { deleteStream(kinesisClient, streamName); } - private boolean checkRecord(ConsumerRecord<String, String> record) { - LOG.debug("Received: {}", record.value()); - received++; - - if (received == expect) { - return false; - } - - return true; - } - - - - public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - + protected void produceTestData() { putRecords(kinesisClient, streamName, expect); - LOG.debug("Initialized the connector and put the data for the test execution"); + } - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); assertEquals(received, expect, "Didn't process the expected amount of messages"); } + @Test @Timeout(120) public void testBasicSendReceive() throws ExecutionException, InterruptedException { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withAmazonConfig(awsService.getConnectionProperties()) .withConfiguration(TestKinesisConfiguration.class.getName()) .withStreamName(streamName); - runtTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } @Test @@ -126,12 +105,12 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withAmazonConfig(awsService.getConnectionProperties(), CamelAWSKinesisPropertyFactory.KAFKA_STYLE) .withConfiguration(TestKinesisConfiguration.class.getName()) .withStreamName(streamName); - runtTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } @Test @@ -139,13 +118,12 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withAmazonConfig(awsService.getConnectionProperties()) .withConfiguration(TestKinesisConfiguration.class.getName()) .withUrl(streamName) .buildUrl(); - runtTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } - } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java index 25e0ec7..f1e36df 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java @@ -17,6 +17,8 @@ package org.apache.camel.kafkaconnector.aws.v2.s3.common; +import java.io.File; +import java.io.IOException; import java.util.List; import org.slf4j.Logger; @@ -27,6 +29,7 @@ import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Object; public final class S3Utils { @@ -94,4 +97,33 @@ public final class S3Utils { s3Client.createBucket(request); } + + public static File[] getFilesToSend(File dir) throws IOException { + File[] files = dir.listFiles(f -> f.getName().endsWith(".test")); + if (files == null) { + throw new IOException("Either I/O error or the path used is not a directory"); + } + + if (files.length == 0) { + throw new IOException("Not enough files to run the test"); + } + + return files; + } + + public static void sendFilesFromPath(S3Client s3Client, String bucketName, File[] files) { + LOG.debug("Putting S3 objects"); + + for (File file : files) { + LOG.debug("Trying to read file {}", file.getName()); + + + PutObjectRequest putObjectRequest = PutObjectRequest.builder() + .bucket(bucketName) + .key(file.getName()) + .build(); + + s3Client.putObject(putObjectRequest, file.toPath()); + } + } } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java index a1a3e9e..d3efcd6 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java @@ -18,23 +18,24 @@ package org.apache.camel.kafkaconnector.aws.v2.s3.source; import java.io.File; +import java.io.IOException; import java.net.URL; import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils; import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.AWSConfigs; import org.apache.camel.test.infra.aws.common.services.AWSService; import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -45,7 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.createBucket; import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBucket; @@ -54,32 +54,39 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") -public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { - - @FunctionalInterface - private interface SendFunction { - void send(); - } - +public class CamelSourceAWSS3ITCase extends CamelSourceTestSupport { @RegisterExtension public static AWSService service = AWSServiceFactory.createS3Service(); private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class); private S3Client awsS3Client; private String bucketName; + private String topicName; - private volatile int received; private int expect; + private File[] files; @Override protected String[] getConnectorsInTest() { return new String[] {"camel-aws2-s3-kafka-connector"}; } + @BeforeAll + public void setupTestFiles() throws IOException { + final URL resourceDir = this.getClass().getResource("."); + final File baseTestDir = new File(resourceDir.getFile()); + + files = S3Utils.getFilesToSend(baseTestDir); + + expect = files.length; + } + + @BeforeEach public void setUp() { + topicName = getTopicForTest(this); + awsS3Client = AWSSDKClientUtils.newS3Client(); - received = 0; bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100); try { @@ -90,8 +97,6 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { } } - - @AfterEach public void tearDown() { try { @@ -101,83 +106,30 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { } } - private boolean checkRecord(ConsumerRecord<String, String> record) { - LOG.debug("Received: {}", record.value()); - received++; - - if (received == expect) { - return false; - } - - return true; - } - - public void runTest(ConnectorPropertyFactory connectorPropertyFactory, SendFunction sendFunction) - throws ExecutionException, InterruptedException { - - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - sendFunction.send(); - - LOG.debug("Done putting S3S objects"); - - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); - } - - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - runTest(connectorPropertyFactory, this::sendFiles); + @Override + protected void produceTestData() { + S3Utils.sendFilesFromPath(awsS3Client, bucketName, files); } - private void sendFilesFromPath(File path) { - LOG.debug("Putting S3 objects"); - - File[] files = path.listFiles(); - if (files == null) { - fail("Either I/O error or the path used is not a directory"); - } - - expect = files.length; - - if (files.length == 0) { - fail("Not enough files to run the test"); - } - - for (File file : files) { - LOG.debug("Trying to read file {}", file.getName()); - - PutObjectRequest putObjectRequest = PutObjectRequest.builder() - .bucket(bucketName) - .key(file.getName()) - .build(); + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); - awsS3Client.putObject(putObjectRequest, file.toPath()); - } + assertEquals(expect, received, "Didn't process the expected amount of messages"); } - private void sendFiles() { - URL resourceDir = this.getClass().getResource("."); - File baseTestDir = new File(resourceDir.getFile()); - - sendFilesFromPath(baseTestDir); - } @Test @Timeout(180) public void testBasicSendReceive() throws ExecutionException, InterruptedException { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withConfiguration(TestS3Configuration.class.getName()) .withBucketNameOrArn(bucketName) .withAmazonConfig(service.getConnectionProperties()); - runTest(connectorPropertyFactory); - - assertEquals(expect, received, "Didn't process the expected amount of messages"); + runTest(connectorPropertyFactory, topicName, expect); } @Test @@ -185,15 +137,13 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { public void testBasicSendReceiveWithMaxMessagesPerPoll() throws ExecutionException, InterruptedException { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withConfiguration(TestS3Configuration.class.getName()) .withMaxMessagesPerPoll(5) .withBucketNameOrArn(bucketName) .withAmazonConfig(service.getConnectionProperties()); - runTest(connectorPropertyFactory); - - assertEquals(expect, received, "Didn't process the expected amount of messages"); + runTest(connectorPropertyFactory, topicName, expect); } @Test @@ -201,14 +151,12 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withConfiguration(TestS3Configuration.class.getName()) .withBucketNameOrArn(bucketName) .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE); - runTest(connectorPropertyFactory); - - assertEquals(expect, received, "Didn't process the expected amount of messages"); + runTest(connectorPropertyFactory, topicName, expect); } @Test @@ -218,7 +166,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withConfiguration(TestS3Configuration.class.getName()) .withUrl(bucketName) .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) @@ -227,46 +175,6 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id())) .buildUrl(); - runTest(connectorPropertyFactory); - - assertEquals(expect, received, "Didn't process the expected amount of messages"); - } - - - - /* To run this test create (large) files in the a test directory - (ie.: dd if=/dev/random of=large bs=512 count=50000) - - Then run it with: - - mvn -DskipIntegrationTests=false -Denable.slow.tests=true - -Daws-service.s3.test.directory=/path/to/manual-s3 - -Dit.test=CamelSourceAWSS3ITCase#testBasicSendReceiveWithKafkaStyleLargeFile verify - */ - @EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches = ".*", - disabledReason = "Manual test that requires the user to provide a directory with files") - @Test - @Timeout(value = 60, unit = TimeUnit.MINUTES) - public void testBasicSendReceiveWithKafkaStyleLargeFile() throws ExecutionException, InterruptedException { - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory - .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withConfiguration(TestS3Configuration.class.getName()) - .withBucketNameOrArn(bucketName) - .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE); - - String filePath = System.getProperty("aws-service.s3.test.directory"); - - File path = new File(filePath); - - runTest(connectorPropertyFactory, () -> sendFilesFromPath(path)); - - String[] files = path.list(); - if (files == null) { - fail("Either I/O error or the path used is not a directory"); - } - - assertEquals(files.length, received, "Didn't process the expected amount of messages"); + runTest(connectorPropertyFactory, topicName, expect); } - } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java new file mode 100644 index 0000000..6d7df7e --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.s3.source; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils; +import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.aws.common.AWSCommon; +import org.apache.camel.test.infra.aws.common.services.AWSService; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; +import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; + +import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.createBucket; +import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBucket; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + + +/* To run this test create (large) files in the a test directory + (ie.: dd if=/dev/random of=large.test bs=512 count=50000) + + Note: they must have the .test extension. + + Then run it with: + + mvn -DskipIntegrationTests=false -Daws-service.s3.test.directory=/path/to/manual-s3 + -Dit.test=CamelSourceAWSS3LargeFilesITCase verify +*/ +@EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches = ".*", + disabledReason = "Manual test that requires the user to provide a directory with files") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class CamelSourceAWSS3LargeFilesITCase extends CamelSourceTestSupport { + @RegisterExtension + public static AWSService service = AWSServiceFactory.createS3Service(); + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3LargeFilesITCase.class); + + private S3Client awsS3Client; + private String bucketName; + private String topicName; + + private int expect; + private File[] files; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-aws2-s3-kafka-connector"}; + } + + @BeforeAll + public void setupTestFiles() throws IOException { + String filePath = System.getProperty("aws-service.s3.test.directory"); + File baseTestDir = new File(filePath); + + files = S3Utils.getFilesToSend(baseTestDir); + + expect = files.length; + } + + + @BeforeEach + public void setUp() { + topicName = TestUtils.getDefaultTestTopic(this.getClass()); + + awsS3Client = AWSSDKClientUtils.newS3Client(); + bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100); + + try { + createBucket(awsS3Client, bucketName); + } catch (Exception e) { + LOG.error("Unable to create bucket: {}", e.getMessage(), e); + fail("Unable to create bucket"); + } + } + + @AfterEach + public void tearDown() { + try { + deleteBucket(awsS3Client, bucketName); + } catch (Exception e) { + LOG.warn("Unable to delete bucked: {}", e.getMessage(), e); + } + } + + @Override + protected void produceTestData() { + S3Utils.sendFilesFromPath(awsS3Client, bucketName, files); + } + + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); + + assertEquals(expect, received, "Didn't process the expected amount of messages"); + } + + + @Test + @Timeout(value = 60, unit = TimeUnit.MINUTES) + public void testBasicSendReceiveWithKafkaStyleLargeFile() throws ExecutionException, InterruptedException { + ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory + .basic() + .withKafkaTopic(topicName) + .withConfiguration(TestS3Configuration.class.getName()) + .withBucketNameOrArn(bucketName) + .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE); + + runTest(connectorPropertyFactory, topicName, expect); + } +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java index e9bdf96..d4b11ac 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java @@ -21,16 +21,15 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.AWSConfigs; import org.apache.camel.test.infra.aws.common.services.AWSService; import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -48,16 +47,15 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") -public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { - +public class CamelSourceAWSSQSITCase extends CamelSourceTestSupport { @RegisterExtension public static AWSService service = AWSServiceFactory.createSQSService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class); private AWSSQSClient awssqsClient; private String queueName; + private String topicName; - private volatile int received; private final int expect = 10; @Override @@ -67,12 +65,13 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { + topicName = getTopicForTest(this); + awssqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient()); queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000); // TODO: this is a work-around for CAMEL-15833 awssqsClient.createQueue(queueName); - received = 0; } @AfterEach @@ -82,32 +81,18 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { } } - private boolean checkRecord(ConsumerRecord<String, String> record) { - LOG.debug("Received: {}", record.value()); - received++; - - if (received == expect) { - return false; - } - - return true; - } - - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); - + @Override + protected void produceTestData() { LOG.debug("Sending SQS messages"); for (int i = 0; i < expect; i++) { awssqsClient.send(queueName, "Source test message " + i); } LOG.debug("Done sending SQS messages"); + } - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); - + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); assertEquals(received, expect, "Didn't process the expected amount of messages"); } @@ -116,11 +101,11 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { public void testBasicSendReceive() throws ExecutionException, InterruptedException { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withQueueOrArn(queueName) .withAmazonConfig(service.getConnectionProperties()); - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } // This test does not run remotely because SQS has a cool down period for @@ -131,11 +116,11 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withQueueOrArn(queueName) .withAmazonConfig(service.getConnectionProperties(), CamelAWSSQSPropertyFactory.KAFKA_STYLE); - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } // This test does not run remotely because SQS has a cool down period for @@ -148,7 +133,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withUrl(queueName) .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY)) @@ -157,6 +142,6 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.toString())) .buildUrl(); - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java index 7c9ee9b..35626a3 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java @@ -56,12 +56,26 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest { * @throws Exception For test-specific exceptions */ public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException { + runTest(connectorPropertyFactory, consumer, this::produceTestData); + } + + /** + * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results + * + * @param connectorPropertyFactory A factory for connector properties + * @param consumer A Kafka consumer consumer for the test messages + * @param producer A producer for the test messages + * @throws Exception For test-specific exceptions + */ + public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer, + FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException { connectorPropertyFactory.log(); LOG.debug("Initialized the connector and put the data for the test execution"); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); +// getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + getKafkaConnectService().initializeConnector(connectorPropertyFactory); LOG.debug("Producing test data to be collected by the connector and sent to Kafka"); - produceTestData(); + producer.produceMessages(); LOG.debug("Creating the Kafka consumer ..."); consumer.consumeMessages();