This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 79926ef8e75f968f9befeb52ed9be49b04d45522 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Dec 16 10:19:02 2020 +0100 Added new S3 v2 manual test that supports testing w/ large files --- .../aws/v2/s3/source/CamelSourceAWSS3ITCase.java | 102 +++++++++++++++++---- 1 file changed, 83 insertions(+), 19 deletions(-) diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java index c1aafe7..4169816 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java @@ -21,6 +21,7 @@ import java.io.File; import java.net.URL; import java.util.Properties; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; @@ -58,6 +59,11 @@ import static org.junit.jupiter.api.Assertions.fail; @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { + @FunctionalInterface + private interface SendFunction { + void send(); + } + @RegisterExtension public static AWSService service = AWSServiceFactory.createS3Service(); private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class); @@ -66,7 +72,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { private String bucketName; private volatile int received; - private final int expect = 10; + private int expect; @Override protected String[] getConnectorsInTest() { @@ -146,35 +152,53 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { return true; } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + public void runTest(ConnectorPropertyFactory connectorPropertyFactory, SendFunction sendFunction) + throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); getKafkaConnectService().initializeConnector(connectorPropertyFactory); - LOG.debug("Putting S3 objects"); - for (int i = 0; i < expect; i++) { - String name = "file" + i + ".test"; - LOG.debug("Trying to read file {}", name); - URL fileResource = this.getClass().getResource(name); - LOG.debug("Found file at {}", fileResource.getPath()); - String file = fileResource.getFile(); - LOG.debug("Using file {}", file); - - LOG.trace("Putting file {}", file); - PutObjectRequest putObjectRequest = PutObjectRequest.builder() - .bucket(bucketName) - .key(name) - .build(); + sendFunction.send(); - awsS3Client.putObject(putObjectRequest, new File(file).toPath()); - } LOG.debug("Done putting S3S objects"); LOG.debug("Creating the consumer ..."); KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); LOG.debug("Created the consumer ..."); + } - assertEquals(received, expect, "Didn't process the expected amount of messages"); + public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + runTest(connectorPropertyFactory, this::sendFiles); + } + + private void sendFilesFromPath(File path) { + LOG.debug("Putting S3 objects"); + + File[] files = path.listFiles(); + expect = files.length; + + if (files.length == 0) { + fail("Not enough files to run the test"); + } + + for (File file : files) { + LOG.debug("Trying to read file {}", file.getName()); + + PutObjectRequest putObjectRequest = PutObjectRequest.builder() + .bucket(bucketName) + .key(file.getName()) + .build(); + + awsS3Client.putObject(putObjectRequest, file.toPath()); + } + } + + private void sendFiles() { + URL resourceDir = this.getClass().getResource("."); + File baseTestDir = new File(resourceDir.getFile()); + + sendFilesFromPath(baseTestDir); } @Test @@ -188,6 +212,8 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { .withAmazonConfig(service.getConnectionProperties()); runTest(connectorPropertyFactory); + + assertEquals(expect, received, "Didn't process the expected amount of messages"); } @Test @@ -202,6 +228,8 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { .withAmazonConfig(service.getConnectionProperties()); runTest(connectorPropertyFactory); + + assertEquals(expect, received, "Didn't process the expected amount of messages"); } @Test @@ -215,6 +243,8 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE); runTest(connectorPropertyFactory); + + assertEquals(expect, received, "Didn't process the expected amount of messages"); } @Test @@ -234,6 +264,40 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { .buildUrl(); runTest(connectorPropertyFactory); + + assertEquals(expect, received, "Didn't process the expected amount of messages"); + } + + + + /* To run this test create (large) files in the a test directory + (ie.: dd if=/dev/random of=large bs=512 count=50000) + + Then run it with: + + mvn -DskipIntegrationTests=false -Denable.slow.tests=true + -Daws-service.s3.test.directory=/path/to/manual-s3 + -Dit.test=CamelSourceAWSS3ITCase#testBasicSendReceiveWithKafkaStyleLargeFile verify + */ + @EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches = ".*", + disabledReason = "Manual test that requires the user to provide a directory with files") + @Test + @Timeout(value = 60, unit = TimeUnit.MINUTES) + public void testBasicSendReceiveWithKafkaStyleLargeFile() throws ExecutionException, InterruptedException { + ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory + .basic() + .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withConfiguration(TestS3Configuration.class.getName()) + .withBucketNameOrArn(bucketName) + .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE); + + String filePath = System.getProperty("aws-service.s3.test.directory"); + + File path = new File(filePath); + + runTest(connectorPropertyFactory, () -> sendFilesFromPath(path)); + + assertEquals(path.list().length, received, "Didn't process the expected amount of messages"); } }