This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fb64227308f57b0f500b2aef65e7c0c051db58c3
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Tue Apr 6 06:55:38 2021 +0200

    CAMEL-16185 - AWS S3: improve multipart support - streaming upload
---
 .../aws2/s3/AWS2S3ComponentConfigurer.java         |  9 ++-------
 .../aws2/s3/AWS2S3EndpointConfigurer.java          |  9 ++-------
 .../component/aws2/s3/AWS2S3Configuration.java     | 23 +++++++++++++++++-----
 .../camel/component/aws2/s3/AWS2S3Endpoint.java    |  2 +-
 .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 16 ++++++++++-----
 .../S3StreamUploadOperationLocalstackTest.java     |  2 +-
 .../S3StreamUploadTimeoutLocalstackTest.java       |  2 +-
 7 files changed, 36 insertions(+), 27 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java
 
b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java
index 182f1b9..89a4c80 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java
@@ -1,14 +1,9 @@
 /* Generated by camel build tools - do NOT edit this file! */
 package org.apache.camel.component.aws2.s3;
 
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
-import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
 import org.apache.camel.spi.PropertyConfigurerGetter;
-import org.apache.camel.spi.ConfigurerStrategy;
 import org.apache.camel.spi.GeneratedPropertyConfigurer;
-import org.apache.camel.util.CaseInsensitiveMap;
 import org.apache.camel.support.component.PropertyConfigurerSupport;
 
 /**
@@ -105,7 +100,7 @@ public class AWS2S3ComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "storageclass":
         case "storageClass": 
getOrCreateConfiguration(target).setStorageClass(property(camelContext, 
java.lang.String.class, value)); return true;
         case "streammode":
-        case "streamMode": 
getOrCreateConfiguration(target).setStreamMode(property(camelContext, 
boolean.class, value)); return true;
+        case "streamMode": 
getOrCreateConfiguration(target).setStreamingUploadMode(property(camelContext, 
boolean.class, value)); return true;
         case "trustallcertificates":
         case "trustAllCertificates": 
getOrCreateConfiguration(target).setTrustAllCertificates(property(camelContext, 
boolean.class, value)); return true;
         case "uriendpointoverride":
@@ -301,7 +296,7 @@ public class AWS2S3ComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "storageclass":
         case "storageClass": return 
getOrCreateConfiguration(target).getStorageClass();
         case "streammode":
-        case "streamMode": return 
getOrCreateConfiguration(target).isStreamMode();
+        case "streamMode": return 
getOrCreateConfiguration(target).isStreamingUploadMode();
         case "trustallcertificates":
         case "trustAllCertificates": return 
getOrCreateConfiguration(target).isTrustAllCertificates();
         case "uriendpointoverride":
diff --git 
a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
 
b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
index 19f618f..bd98c61 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
@@ -1,14 +1,9 @@
 /* Generated by camel build tools - do NOT edit this file! */
 package org.apache.camel.component.aws2.s3;
 
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
-import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
 import org.apache.camel.spi.PropertyConfigurerGetter;
-import org.apache.camel.spi.ConfigurerStrategy;
 import org.apache.camel.spi.GeneratedPropertyConfigurer;
-import org.apache.camel.util.CaseInsensitiveMap;
 import org.apache.camel.support.component.PropertyConfigurerSupport;
 
 /**
@@ -128,7 +123,7 @@ public class AWS2S3EndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "storageclass":
         case "storageClass": 
target.getConfiguration().setStorageClass(property(camelContext, 
java.lang.String.class, value)); return true;
         case "streammode":
-        case "streamMode": 
target.getConfiguration().setStreamMode(property(camelContext, boolean.class, 
value)); return true;
+        case "streamMode": 
target.getConfiguration().setStreamingUploadMode(property(camelContext, 
boolean.class, value)); return true;
         case "timeunit":
         case "timeUnit": target.setTimeUnit(property(camelContext, 
java.util.concurrent.TimeUnit.class, value)); return true;
         case "trustallcertificates":
@@ -392,7 +387,7 @@ public class AWS2S3EndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "storageclass":
         case "storageClass": return 
target.getConfiguration().getStorageClass();
         case "streammode":
-        case "streamMode": return target.getConfiguration().isStreamMode();
+        case "streamMode": return 
target.getConfiguration().isStreamingUploadMode();
         case "timeunit":
         case "timeUnit": return target.getTimeUnit();
         case "trustallcertificates":
diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
index d135306..0f48a66 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
@@ -112,13 +112,15 @@ public class AWS2S3Configuration implements Cloneable {
     @UriParam(defaultValue = "false")
     private boolean pojoRequest;
     @UriParam(defaultValue = "false", label = "producer")
-    private boolean streamMode;
+    private boolean streamingUploadMode;
     @UriParam(defaultValue = "10", label = "producer")
     private int batchMessageNumber = 10;
     @UriParam(defaultValue = "1000000", label = "producer")
     private int batchSize = 1000000;
     @UriParam(defaultValue = "progressive", label = "producer")
     private AWSS3NamingStrategyEnum namingStrategy = 
AWSS3NamingStrategyEnum.progressive;
+    @UriParam(label = "producer")
+    private long streamingUploadTimeout;
 
     public long getPartSize() {
         return partSize;
@@ -584,15 +586,15 @@ public class AWS2S3Configuration implements Cloneable {
         this.amazonS3Presigner = amazonS3Presigner;
     }
 
-    public boolean isStreamMode() {
-        return streamMode;
+    public boolean isStreamingUploadMode() {
+        return streamingUploadMode;
     }
 
     /**
      * When stream mode is true the upload to bucket will be done in streaming
      */
-    public void setStreamMode(boolean streamMode) {
-        this.streamMode = streamMode;
+    public void setStreamingUploadMode(boolean streamingUploadMode) {
+        this.streamingUploadMode = streamingUploadMode;
     }
 
     public int getBatchMessageNumber() {
@@ -628,6 +630,17 @@ public class AWS2S3Configuration implements Cloneable {
         this.namingStrategy = namingStrategy;
     }
 
+    public long getStreamingUploadTimeout() {
+        return streamingUploadTimeout;
+    }
+
+    /**
+     * While streaming upload mode is true, this option set the timeout to 
complete upload
+     */
+    public void setStreamingUploadTimeout(long streamingUploadTimeout) {
+        this.streamingUploadTimeout = streamingUploadTimeout;
+    }
+
     public AWS2S3Configuration copy() {
         try {
             return (AWS2S3Configuration) super.clone();
diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
index 32d03835..f0bf82a 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
@@ -73,7 +73,7 @@ public class AWS2S3Endpoint extends ScheduledPollEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        if (!configuration.isStreamMode()) {
+        if (!configuration.isStreamingUploadMode()) {
             return new AWS2S3Producer(this);
         } else {
             return new AWS2S3StreamUploadProducer(this);
diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index f4d544d..21e8b53c 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -73,10 +73,12 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        timeoutCheckerExecutorService
-                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
-                        "timeout_checker");
-        timeoutCheckerExecutorService.scheduleAtFixedRate(new 
AggregationIntervalTask(), 10, 10, TimeUnit.SECONDS);
+        if 
(ObjectHelper.isNotEmpty(getConfiguration().getStreamingUploadTimeout())) {
+            timeoutCheckerExecutorService
+                    = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
+                    "timeout_checker");
+            timeoutCheckerExecutorService.scheduleAtFixedRate(new 
AggregationIntervalTask(), getConfiguration().getStreamingUploadTimeout(), 
getConfiguration().getStreamingUploadTimeout(), TimeUnit.MILLISECONDS);
+        }
     }
 
     @Override
@@ -89,6 +91,10 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
                 }
             }
         }
+        if (timeoutCheckerExecutorService != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(timeoutCheckerExecutorService);
+            timeoutCheckerExecutorService = null;
+        }
         super.doStop();
 
     }
@@ -276,7 +282,7 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
     @Override
     public String toString() {
         if (s3ProducerToString == null) {
-            s3ProducerToString = "S3Producer[" + 
URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+            s3ProducerToString = "AWS2S3StreamUploadProducer[" + 
URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
         }
         return s3ProducerToString;
     }
diff --git 
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
 
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
index 0db9740..7330813 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
@@ -71,7 +71,7 @@ public class S3StreamUploadOperationLocalstackTest extends 
Aws2S3BaseTest {
             @Override
             public void configure() throws Exception {
                 String awsEndpoint1
-                        = 
"aws2-s3://mycamel-1?autoCreateBucket=true&streamMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random";
+                        = 
"aws2-s3://mycamel-1?autoCreateBucket=true&streamingUploadMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random";
 
                 from("direct:stream1").to(awsEndpoint1).to("mock:result");
 
diff --git 
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java
 
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java
index f42a153..d958e1c 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java
@@ -55,7 +55,7 @@ public class S3StreamUploadTimeoutLocalstackTest extends 
Aws2S3BaseTest {
             @Override
             public void configure() throws Exception {
                 String awsEndpoint1
-                        = 
"aws2-s3://mycamel-1?autoCreateBucket=true&streamMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random";
+                        = 
"aws2-s3://mycamel-1?autoCreateBucket=true&streamingUploadMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random";
 
                 from("direct:stream1").to(awsEndpoint1).to("mock:result");
 

Reply via email to