This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 73e414d80f984ae7d497f0f138549a515c33f3f0 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri Apr 2 07:55:10 2021 +0200 Camel-AWS2-S3: Extract common method in an utility class --- .../camel/component/aws2/s3/AWS2S3Producer.java | 89 +++++----------------- .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 67 +--------------- .../camel/component/aws2/s3/utils/AWS2S3Utils.java | 87 +++++++++++++++++++++ 3 files changed, 110 insertions(+), 133 deletions(-) diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java index f497d17..9ee8ecb 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java @@ -33,6 +33,7 @@ import org.apache.camel.Exchange; import org.apache.camel.InvalidPayloadException; import org.apache.camel.Message; import org.apache.camel.WrappedFile; +import org.apache.camel.component.aws2.s3.utils.AWS2S3Utils; import org.apache.camel.support.DefaultProducer; import org.apache.camel.util.FileUtil; import org.apache.camel.util.IOHelper; @@ -153,11 +154,11 @@ public class AWS2S3Producer extends DefaultProducer { objectMetadata.put("Content-Length", String.valueOf(filePayload.length())); } - final String keyName = determineKey(exchange); + final String keyName = AWS2S3Utils.determineKey(exchange, getConfiguration()); CreateMultipartUploadRequest.Builder createMultipartUploadRequest = CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(keyName); - String storageClass = determineStorageClass(exchange); + String storageClass = AWS2S3Utils.determineStorageClass(exchange, getConfiguration()); if (storageClass != null) { createMultipartUploadRequest.storageClass(storageClass); } @@ -270,7 +271,7 @@ public class AWS2S3Producer extends DefaultProducer { if (objectMetadata.get("Content-Length").equals("0") && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) { LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory"); - baos = determineLengthInputStream(is); + baos = AWS2S3Utils.determineLengthInputStream(is); objectMetadata.put("Content-Length", String.valueOf(baos.size())); is = new ByteArrayInputStream(baos.toByteArray()); } else { @@ -281,11 +282,11 @@ public class AWS2S3Producer extends DefaultProducer { } } - final String bucketName = determineBucketName(exchange); - final String key = determineKey(exchange); + final String bucketName = AWS2S3Utils.determineBucketName(exchange, getConfiguration()); + final String key = AWS2S3Utils.determineKey(exchange, getConfiguration()); putObjectRequest.bucket(bucketName).key(key).metadata(objectMetadata); - String storageClass = determineStorageClass(exchange); + String storageClass = AWS2S3Utils.determineStorageClass(exchange, getConfiguration()); if (storageClass != null) { putObjectRequest.storageClass(storageClass); } @@ -344,8 +345,8 @@ public class AWS2S3Producer extends DefaultProducer { } private void copyObject(S3Client s3Client, Exchange exchange) throws InvalidPayloadException { - final String bucketName = determineBucketName(exchange); - final String sourceKey = determineKey(exchange); + final String bucketName = AWS2S3Utils.determineBucketName(exchange, getConfiguration()); + final String sourceKey = AWS2S3Utils.determineKey(exchange, getConfiguration()); final String destinationKey = exchange.getIn().getHeader(AWS2S3Constants.DESTINATION_KEY, String.class); final String bucketNameDestination = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_DESTINATION_NAME, String.class); if (getConfiguration().isPojoRequest()) { @@ -396,8 +397,8 @@ public class AWS2S3Producer extends DefaultProducer { } private void deleteObject(S3Client s3Client, Exchange exchange) throws InvalidPayloadException { - final String bucketName = determineBucketName(exchange); - final String sourceKey = determineKey(exchange); + final String bucketName = AWS2S3Utils.determineBucketName(exchange, getConfiguration()); + final String sourceKey = AWS2S3Utils.determineKey(exchange, getConfiguration()); if (getConfiguration().isPojoRequest()) { Object payload = exchange.getIn().getMandatoryBody(); if (payload instanceof DeleteObjectRequest) { @@ -423,7 +424,7 @@ public class AWS2S3Producer extends DefaultProducer { } private void deleteBucket(S3Client s3Client, Exchange exchange) throws InvalidPayloadException { - final String bucketName = determineBucketName(exchange); + final String bucketName = AWS2S3Utils.determineBucketName(exchange, getConfiguration()); if (getConfiguration().isPojoRequest()) { Object payload = exchange.getIn().getMandatoryBody(); @@ -453,8 +454,8 @@ public class AWS2S3Producer extends DefaultProducer { message.setBody(res); } } else { - final String bucketName = determineBucketName(exchange); - final String sourceKey = determineKey(exchange); + final String bucketName = AWS2S3Utils.determineBucketName(exchange, getConfiguration()); + final String sourceKey = AWS2S3Utils.determineKey(exchange,getConfiguration()); GetObjectRequest.Builder req = GetObjectRequest.builder().bucket(bucketName).key(sourceKey); ResponseInputStream<GetObjectResponse> res = s3Client.getObject(req.build(), ResponseTransformer.toInputStream()); @@ -464,8 +465,8 @@ public class AWS2S3Producer extends DefaultProducer { } private void getObjectRange(S3Client s3Client, Exchange exchange) throws InvalidPayloadException { - final String bucketName = determineBucketName(exchange); - final String sourceKey = determineKey(exchange); + final String bucketName = AWS2S3Utils.determineBucketName(exchange, getConfiguration()); + final String sourceKey = AWS2S3Utils.determineKey(exchange, getConfiguration()); final String rangeStart = exchange.getIn().getHeader(AWS2S3Constants.RANGE_START, String.class); final String rangeEnd = exchange.getIn().getHeader(AWS2S3Constants.RANGE_END, String.class); @@ -494,7 +495,7 @@ public class AWS2S3Producer extends DefaultProducer { } private void listObjects(S3Client s3Client, Exchange exchange) throws InvalidPayloadException { - final String bucketName = determineBucketName(exchange); + final String bucketName = AWS2S3Utils.determineBucketName(exchange, getConfiguration()); if (getConfiguration().isPojoRequest()) { Object payload = exchange.getIn().getMandatoryBody(); @@ -513,8 +514,8 @@ public class AWS2S3Producer extends DefaultProducer { } private void createDownloadLink(S3Client s3Client, Exchange exchange) { - final String bucketName = determineBucketName(exchange); - final String key = determineKey(exchange); + final String bucketName = AWS2S3Utils.determineBucketName(exchange, getConfiguration()); + final String key = AWS2S3Utils.determineKey(exchange, getConfiguration()); long milliSeconds = 0; @@ -599,58 +600,6 @@ public class AWS2S3Producer extends DefaultProducer { return objectMetadata; } - /** - * Reads the bucket name from the header of the given exchange. If not provided, it's read from the endpoint - * configuration. - * - * @param exchange The exchange to read the header from. - * @return The bucket name. - * @throws IllegalArgumentException if the header could not be determined. - */ - private String determineBucketName(final Exchange exchange) { - String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class); - - if (ObjectHelper.isEmpty(bucketName)) { - bucketName = getConfiguration().getBucketName(); - LOG.trace("AWS S3 Bucket name header is missing, using default one [{}]", bucketName); - } - - if (bucketName == null) { - throw new IllegalArgumentException("AWS S3 Bucket name header is missing or not configured."); - } - - return bucketName; - } - - private String determineKey(final Exchange exchange) { - String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class); - if (ObjectHelper.isEmpty(key)) { - key = getConfiguration().getKeyName(); - } - if (key == null) { - throw new IllegalArgumentException("AWS S3 Key header missing."); - } - return key; - } - - private String determineStorageClass(final Exchange exchange) { - String storageClass = exchange.getIn().getHeader(AWS2S3Constants.STORAGE_CLASS, String.class); - if (storageClass == null) { - storageClass = getConfiguration().getStorageClass(); - } - - return storageClass; - } - - private ByteArrayOutputStream determineLengthInputStream(InputStream is) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] bytes = new byte[1024]; - int count; - while ((count = is.read(bytes)) > 0) { - out.write(bytes, 0, count); - } - return out; - } protected AWS2S3Configuration getConfiguration() { return getEndpoint().getConfiguration(); diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java index a4012b6..2d06bd9 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java @@ -30,6 +30,7 @@ import org.apache.camel.Message; import org.apache.camel.component.aws2.s3.AWS2S3Configuration; import org.apache.camel.component.aws2.s3.AWS2S3Constants; import org.apache.camel.component.aws2.s3.AWS2S3Endpoint; +import org.apache.camel.component.aws2.s3.utils.AWS2S3Utils; import org.apache.camel.support.DefaultProducer; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; @@ -77,8 +78,8 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { buffer.write(IoUtils.toByteArray(is)); final String keyName = getConfiguration().getKeyName(); - final String fileName = determineFileName(keyName); - final String extension = determineFileExtension(keyName); + final String fileName = AWS2S3Utils.determineFileName(keyName); + final String extension = AWS2S3Utils.determineFileExtension(keyName); if (index.get() == 1 && getConfiguration().getNamingStrategy().equals(AWSS3NamingStrategyEnum.random)) { id = UUID.randomUUID(); } @@ -86,7 +87,7 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { CreateMultipartUploadRequest.Builder createMultipartUploadRequest = CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(dynamicKeyName); - String storageClass = determineStorageClass(exchange); + String storageClass = AWS2S3Utils.determineStorageClass(exchange, getConfiguration()); if (storageClass != null) { createMultipartUploadRequest.storageClass(storageClass); } @@ -216,66 +217,6 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { return dynamicKeyName; } - /** - * Reads the bucket name from the header of the given exchange. If not provided, it's read from the endpoint - * configuration. - * - * @param exchange The exchange to read the header from. - * @return The bucket name. - * @throws IllegalArgumentException if the header could not be determined. - */ - private String determineBucketName(final Exchange exchange) { - String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class); - - if (ObjectHelper.isEmpty(bucketName)) { - bucketName = getConfiguration().getBucketName(); - LOG.trace("AWS S3 Bucket name header is missing, using default one [{}]", bucketName); - } - - if (bucketName == null) { - throw new IllegalArgumentException("AWS S3 Bucket name header is missing or not configured."); - } - - return bucketName; - } - - private String determineStorageClass(final Exchange exchange) { - String storageClass = exchange.getIn().getHeader(AWS2S3Constants.STORAGE_CLASS, String.class); - if (storageClass == null) { - storageClass = getConfiguration().getStorageClass(); - } - - return storageClass; - } - - private String determineFileExtension(String keyName) { - int extPosition = keyName.lastIndexOf("."); - if (extPosition == -1) { - return ""; - } else { - return keyName.substring(extPosition); - } - } - - private String determineFileName(String keyName) { - int extPosition = keyName.lastIndexOf("."); - if (extPosition == -1) { - return keyName; - } else { - return keyName.substring(0, extPosition); - } - } - - private ByteArrayOutputStream determineLengthInputStream(InputStream is) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] bytes = new byte[1024]; - int count; - while ((count = is.read(bytes)) > 0) { - out.write(bytes, 0, count); - } - return out; - } - protected AWS2S3Configuration getConfiguration() { return getEndpoint().getConfiguration(); } diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java new file mode 100644 index 0000000..7f9d669 --- /dev/null +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java @@ -0,0 +1,87 @@ +package org.apache.camel.component.aws2.s3.utils; + +import org.apache.camel.Exchange; +import org.apache.camel.component.aws2.s3.AWS2S3Configuration; +import org.apache.camel.component.aws2.s3.AWS2S3Constants; +import org.apache.camel.util.ObjectHelper; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +public final class AWS2S3Utils { + + private AWS2S3Utils() { + } + + /** + * Reads the bucket name from the header of the given exchange. If not provided, it's read from the endpoint + * configuration. + * + * @param exchange The exchange to read the header from + * @param configuration The AWS2 S3 configuration + * @return The bucket name. + * @throws IllegalArgumentException if the header could not be determined. + */ + public static String determineBucketName(final Exchange exchange, AWS2S3Configuration configuration) { + String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class); + + if (ObjectHelper.isEmpty(bucketName)) { + bucketName = configuration.getBucketName(); + } + + if (bucketName == null) { + throw new IllegalArgumentException("AWS S3 Bucket name header is missing or not configured."); + } + + return bucketName; + } + + public static String determineStorageClass(final Exchange exchange, AWS2S3Configuration configuration) { + String storageClass = exchange.getIn().getHeader(AWS2S3Constants.STORAGE_CLASS, String.class); + if (storageClass == null) { + storageClass = configuration.getStorageClass(); + } + + return storageClass; + } + + public static String determineFileExtension(String keyName) { + int extPosition = keyName.lastIndexOf("."); + if (extPosition == -1) { + return ""; + } else { + return keyName.substring(extPosition); + } + } + + public static String determineFileName(String keyName) { + int extPosition = keyName.lastIndexOf("."); + if (extPosition == -1) { + return keyName; + } else { + return keyName.substring(0, extPosition); + } + } + + public static ByteArrayOutputStream determineLengthInputStream(InputStream is) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] bytes = new byte[1024]; + int count; + while ((count = is.read(bytes)) > 0) { + out.write(bytes, 0, count); + } + return out; + } + + public static String determineKey(final Exchange exchange, AWS2S3Configuration configuration) { + String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class); + if (ObjectHelper.isEmpty(key)) { + key = configuration.getKeyName(); + } + if (key == null) { + throw new IllegalArgumentException("AWS S3 Key header missing."); + } + return key; + } +}