This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch s3-kms
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 9561db08ce766f1b32d6c7521629fb58b1a4fa65
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Tue Feb 9 15:12:19 2021 -0800

    Support S3 with server side encryption mode aws:kms
---
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 132 +++++++++++++--------
 1 file changed, 82 insertions(+), 50 deletions(-)

diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index fff5a2c..55873ec 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.filesystem;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,7 +31,7 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.HashMap;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
@@ -63,6 +64,7 @@ import 
software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
 
 
 /**
@@ -74,22 +76,52 @@ public class S3PinotFS extends PinotFS {
   public static final String REGION = "region";
   public static final String ENDPOINT = "endpoint";
   public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl";
+  public static final String SERVER_SIDE_ENCRYPTION_CONFIG_KEY = 
"serverSideEncryption";
+  public static final String SSE_KMS_KEY_ID_CONFIG_KEY = "ssekmsKeyId";
+  public static final String SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY = 
"ssekmsEncryptionContext";
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(S3PinotFS.class);
   private static final String DELIMITER = "/";
   public static final String S3_SCHEME = "s3://";
-  private static boolean DEFAULT_DISABLE_ACL = true;
+  private static final boolean DEFAULT_DISABLE_ACL = true;
   private S3Client _s3Client;
-  private boolean disableAcl = DEFAULT_DISABLE_ACL;
+  private boolean _disableAcl = DEFAULT_DISABLE_ACL;
+  private ServerSideEncryption _serverSideEncryption = null;
+  private String _ssekmsKeyId;
+  private String _ssekmsEncryptionContext;
 
   @Override
   public void init(PinotConfiguration config) {
     Preconditions.checkArgument(!isNullOrEmpty(config.getProperty(REGION)));
     String region = config.getProperty(REGION);
-    disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, 
DEFAULT_DISABLE_ACL);
+    _disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, 
DEFAULT_DISABLE_ACL);
+    String serverSideEncryption = 
config.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY);
+    if (serverSideEncryption != null) {
+      try {
+        _serverSideEncryption = 
ServerSideEncryption.valueOf(serverSideEncryption);
+      } catch (Exception e) {
+        throw new UnsupportedOperationException(String
+            .format("Unknown value '%s' for S3PinotFS config: 
'serverSideEncryption'. Supported values are: %s",
+                serverSideEncryption, 
Arrays.toString(ServerSideEncryption.knownValues().toArray())));
+      }
+      switch (_serverSideEncryption) {
+        case AWS_KMS:
+          _ssekmsKeyId = config.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
+          if (_ssekmsKeyId == null) {
+            throw new UnsupportedOperationException(
+                "Missing required config: 'sseKmsKeyId' when AWS_KMS is used 
for server side encryption");
+          }
+          _ssekmsEncryptionContext = 
config.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
+          break;
+        case AES256:
+          // Todo: Support AES256.
+        default:
+          throw new UnsupportedOperationException("Unsupported server side 
encryption: " + _serverSideEncryption);
+      }
+    }
     AwsCredentialsProvider awsCredentialsProvider;
-    try {
 
+    try {
       if (!isNullOrEmpty(config.getProperty(ACCESS_KEY)) && 
!isNullOrEmpty(config.getProperty(SECRET_KEY))) {
         String accessKey = config.getProperty(ACCESS_KEY);
         String secretKey = config.getProperty(SECRET_KEY);
@@ -99,7 +131,8 @@ public class S3PinotFS extends PinotFS {
         awsCredentialsProvider = DefaultCredentialsProvider.create();
       }
 
-      S3ClientBuilder s3ClientBuilder = 
S3Client.builder().region(Region.of(region)).credentialsProvider(awsCredentialsProvider);
+      S3ClientBuilder s3ClientBuilder =
+          
S3Client.builder().region(Region.of(region)).credentialsProvider(awsCredentialsProvider);
       if (!isNullOrEmpty(config.getProperty(ENDPOINT))) {
         String endpoint = config.getProperty(ENDPOINT);
         try {
@@ -248,14 +281,7 @@ public class S3PinotFS extends PinotFS {
       }
 
       String dstPath = sanitizePath(dstUri.getPath());
-      CopyObjectRequest.Builder copyReqBuilder = 
CopyObjectRequest.builder().copySource(encodedUrl)
-          .destinationBucket(dstUri.getHost()).destinationKey(dstPath);
-
-      if (!disableAcl) {
-        copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
-      }
-
-      CopyObjectRequest copyReq = copyReqBuilder.build();
+      CopyObjectRequest copyReq = generateCopyObjectRequest(encodedUrl, 
dstUri, dstPath, null);
       CopyObjectResponse copyObjectResponse = _s3Client.copyObject(copyReq);
       return copyObjectResponse.sdkHttpResponse().isSuccessful();
     } catch (S3Exception e) {
@@ -275,15 +301,8 @@ public class S3PinotFS extends PinotFS {
         return true;
       }
 
-      PutObjectRequest.Builder putReqBuilder = 
PutObjectRequest.builder().bucket(uri.getHost()).key(path);
-
-      if (!disableAcl) {
-        putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
-      }
-
-      PutObjectRequest putObjectRequest = putReqBuilder.build();
+      PutObjectRequest putObjectRequest = generatePutObjectRequest(uri, path);
       PutObjectResponse putObjectResponse = 
_s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0]));
-
       return putObjectResponse.sdkHttpResponse().isSuccessful();
     } catch (Throwable t) {
       throw new IOException(t);
@@ -477,13 +496,7 @@ public class S3PinotFS extends PinotFS {
     LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), dstUri);
     URI base = getBase(dstUri);
     String prefix = sanitizePath(base.relativize(dstUri).getPath());
-    PutObjectRequest.Builder putReqBuilder = 
PutObjectRequest.builder().bucket(dstUri.getHost()).key(prefix);
-
-    if (!disableAcl) {
-      putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
-    }
-
-    PutObjectRequest putObjectRequest = putReqBuilder.build();
+    PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri, 
prefix);
     _s3Client.putObject(putObjectRequest, srcFile.toPath());
   }
 
@@ -496,9 +509,8 @@ public class S3PinotFS extends PinotFS {
         return true;
       }
 
-      ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request
-          .builder().bucket(uri.getHost())
-          .prefix(prefix).maxKeys(2).build();
+      ListObjectsV2Request listObjectsV2Request =
+          
ListObjectsV2Request.builder().bucket(uri.getHost()).prefix(prefix).maxKeys(2).build();
       ListObjectsV2Response listObjectsV2Response = 
_s3Client.listObjectsV2(listObjectsV2Request);
       return listObjectsV2Response.hasContents();
     } catch (NoSuchKeyException e) {
@@ -526,29 +538,14 @@ public class S3PinotFS extends PinotFS {
       }
 
       String path = sanitizePath(uri.getPath());
-      Map<String, String> mp = new HashMap<>();
-      mp.put("lastModified", String.valueOf(System.currentTimeMillis()));
-      CopyObjectRequest.Builder copyReqBuilder = 
CopyObjectRequest.builder().copySource(encodedUrl)
-          .destinationBucket(uri.getHost()).destinationKey(path)
-          .metadata(mp).metadataDirective(MetadataDirective.REPLACE);
-
-      if (!disableAcl) {
-        copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
-      }
-
-      CopyObjectRequest request = copyReqBuilder.build();
+      CopyObjectRequest request = generateCopyObjectRequest(encodedUrl, uri, 
path,
+          ImmutableMap.of("lastModified", 
String.valueOf(System.currentTimeMillis())));
       _s3Client.copyObject(request);
       long newUpdateTime = 
getS3ObjectMetadata(uri).lastModified().toEpochMilli();
       return newUpdateTime > s3ObjectMetadata.lastModified().toEpochMilli();
     } catch (NoSuchKeyException e) {
       String path = sanitizePath(uri.getPath());
-      PutObjectRequest.Builder putReqBuilder = 
PutObjectRequest.builder().bucket(uri.getHost()).key(path);
-
-      if (!disableAcl) {
-        putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
-      }
-
-      PutObjectRequest putObjectRequest = putReqBuilder.build();
+      PutObjectRequest putObjectRequest = generatePutObjectRequest(uri, path);
       _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new 
byte[0]));
       return true;
     } catch (S3Exception e) {
@@ -556,6 +553,41 @@ public class S3PinotFS extends PinotFS {
     }
   }
 
+  private PutObjectRequest generatePutObjectRequest(URI uri, String path) {
+    PutObjectRequest.Builder putReqBuilder = 
PutObjectRequest.builder().bucket(uri.getHost()).key(path);
+
+    if (!_disableAcl) {
+      putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+    }
+
+    if (_serverSideEncryption != null) {
+      
putReqBuilder.serverSideEncryption(_serverSideEncryption).ssekmsKeyId(_ssekmsKeyId);
+      if (_ssekmsEncryptionContext != null) {
+        putReqBuilder.ssekmsEncryptionContext(_ssekmsEncryptionContext);
+      }
+    }
+    return putReqBuilder.build();
+  }
+
+  private CopyObjectRequest generateCopyObjectRequest(String copySource, URI 
dest, String path,
+      Map<String, String> metadata) {
+    CopyObjectRequest.Builder copyReqBuilder =
+        
CopyObjectRequest.builder().copySource(copySource).destinationBucket(dest.getHost()).destinationKey(path);
+    if (metadata != null) {
+      
copyReqBuilder.metadata(metadata).metadataDirective(MetadataDirective.REPLACE);
+    }
+    if (!_disableAcl) {
+      copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+    }
+    if (_serverSideEncryption != null) {
+      
copyReqBuilder.serverSideEncryption(_serverSideEncryption).ssekmsKeyId(_ssekmsKeyId);
+      if (_ssekmsEncryptionContext != null) {
+        copyReqBuilder.ssekmsEncryptionContext(_ssekmsEncryptionContext);
+      }
+    }
+    return copyReqBuilder.build();
+  }
+
   @Override
   public InputStream open(URI uri)
       throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to