This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 79926ef8e75f968f9befeb52ed9be49b04d45522
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Dec 16 10:19:02 2020 +0100

    Added new S3 v2 manual test that supports testing w/ large files
---
 .../aws/v2/s3/source/CamelSourceAWSS3ITCase.java   | 102 +++++++++++++++++----
 1 file changed, 83 insertions(+), 19 deletions(-)

diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
index c1aafe7..4169816 100644
--- 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.net.URL;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
@@ -58,6 +59,11 @@ import static org.junit.jupiter.api.Assertions.fail;
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
 public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
 
+    @FunctionalInterface
+    private interface SendFunction {
+        void send();
+    }
+
     @RegisterExtension
     public static AWSService service = AWSServiceFactory.createS3Service();
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);
@@ -66,7 +72,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest 
{
     private String bucketName;
 
     private volatile int received;
-    private final int expect = 10;
+    private int expect;
 
     @Override
     protected String[] getConnectorsInTest() {
@@ -146,35 +152,53 @@ public class CamelSourceAWSS3ITCase extends 
AbstractKafkaTest {
         return true;
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, 
SendFunction sendFunction)
+            throws ExecutionException, InterruptedException {
+
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
-        LOG.debug("Putting S3 objects");
-        for (int i = 0; i < expect; i++) {
-            String name = "file" + i + ".test";
-            LOG.debug("Trying to read file {}", name);
-            URL fileResource = this.getClass().getResource(name);
-            LOG.debug("Found file at {}", fileResource.getPath());
-            String file = fileResource.getFile();
-            LOG.debug("Using file {}", file);
-
-            LOG.trace("Putting file {}", file);
-            PutObjectRequest putObjectRequest = PutObjectRequest.builder()
-                    .bucket(bucketName)
-                    .key(name)
-                    .build();
+        sendFunction.send();
 
-            awsS3Client.putObject(putObjectRequest, new File(file).toPath());
-        }
         LOG.debug("Done putting S3S objects");
 
         LOG.debug("Creating the consumer ...");
         KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
         kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), 
this::checkRecord);
         LOG.debug("Created the consumer ...");
+    }
 
-        assertEquals(received, expect, "Didn't process the expected amount of 
messages");
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
+        runTest(connectorPropertyFactory, this::sendFiles);
+    }
+
+    private void sendFilesFromPath(File path) {
+        LOG.debug("Putting S3 objects");
+
+        File[] files = path.listFiles();
+        expect = files.length;
+
+        if (files.length == 0) {
+            fail("Not enough files to run the test");
+        }
+
+        for (File file : files) {
+            LOG.debug("Trying to read file {}", file.getName());
+
+            PutObjectRequest putObjectRequest = PutObjectRequest.builder()
+                    .bucket(bucketName)
+                    .key(file.getName())
+                    .build();
+
+            awsS3Client.putObject(putObjectRequest, file.toPath());
+        }
+    }
+
+    private void sendFiles() {
+        URL resourceDir = this.getClass().getResource(".");
+        File baseTestDir = new File(resourceDir.getFile());
+
+        sendFilesFromPath(baseTestDir);
     }
 
     @Test
@@ -188,6 +212,8 @@ public class CamelSourceAWSS3ITCase extends 
AbstractKafkaTest {
                 .withAmazonConfig(service.getConnectionProperties());
 
         runTest(connectorPropertyFactory);
+
+        assertEquals(expect, received,  "Didn't process the expected amount of 
messages");
     }
 
     @Test
@@ -202,6 +228,8 @@ public class CamelSourceAWSS3ITCase extends 
AbstractKafkaTest {
                 .withAmazonConfig(service.getConnectionProperties());
 
         runTest(connectorPropertyFactory);
+
+        assertEquals(expect, received,  "Didn't process the expected amount of 
messages");
     }
 
     @Test
@@ -215,6 +243,8 @@ public class CamelSourceAWSS3ITCase extends 
AbstractKafkaTest {
                 .withAmazonConfig(service.getConnectionProperties(), 
CamelAWSS3PropertyFactory.KAFKA_STYLE);
 
         runTest(connectorPropertyFactory);
+
+        assertEquals(expect, received,  "Didn't process the expected amount of 
messages");
     }
 
     @Test
@@ -234,6 +264,40 @@ public class CamelSourceAWSS3ITCase extends 
AbstractKafkaTest {
                 .buildUrl();
 
         runTest(connectorPropertyFactory);
+
+        assertEquals(expect, received,  "Didn't process the expected amount of 
messages");
+    }
+
+
+
+    /* To run this test create (large) files in the a test directory
+        (ie.: dd if=/dev/random of=large bs=512 count=50000)
+
+        Then run it with:
+
+        mvn -DskipIntegrationTests=false -Denable.slow.tests=true
+            -Daws-service.s3.test.directory=/path/to/manual-s3
+            
-Dit.test=CamelSourceAWSS3ITCase#testBasicSendReceiveWithKafkaStyleLargeFile 
verify
+     */
+    @EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches 
= ".*",
+            disabledReason = "Manual test that requires the user to provide a 
directory with files")
+    @Test
+    @Timeout(value = 60, unit = TimeUnit.MINUTES)
+    public void testBasicSendReceiveWithKafkaStyleLargeFile() throws 
ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelAWSS3PropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withConfiguration(TestS3Configuration.class.getName())
+                .withBucketNameOrArn(bucketName)
+                .withAmazonConfig(service.getConnectionProperties(), 
CamelAWSS3PropertyFactory.KAFKA_STYLE);
+
+        String filePath = System.getProperty("aws-service.s3.test.directory");
+
+        File path = new File(filePath);
+
+        runTest(connectorPropertyFactory, () -> sendFilesFromPath(path));
+
+        assertEquals(path.list().length, received, "Didn't process the 
expected amount of messages");
     }
 
 }

Reply via email to