This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch s3-kms in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 9561db08ce766f1b32d6c7521629fb58b1a4fa65 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Tue Feb 9 15:12:19 2021 -0800 Support S3 with server side encryption mode aws:kms --- .../apache/pinot/plugin/filesystem/S3PinotFS.java | 132 +++++++++++++-------- 1 file changed, 82 insertions(+), 50 deletions(-) diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java index fff5a2c..55873ec 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java @@ -20,6 +20,7 @@ package org.apache.pinot.plugin.filesystem; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -30,7 +31,7 @@ import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.HashMap; +import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; @@ -63,6 +64,7 @@ import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.model.ServerSideEncryption; /** @@ -74,22 +76,52 @@ public class S3PinotFS extends PinotFS { public static final String REGION = "region"; public static final String ENDPOINT = "endpoint"; public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl"; + public static final String SERVER_SIDE_ENCRYPTION_CONFIG_KEY = "serverSideEncryption"; + public static final String SSE_KMS_KEY_ID_CONFIG_KEY = "ssekmsKeyId"; + public static final String SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY = "ssekmsEncryptionContext"; private static final Logger LOGGER = LoggerFactory.getLogger(S3PinotFS.class); private static final String DELIMITER = "/"; public static final String S3_SCHEME = "s3://"; - private static boolean DEFAULT_DISABLE_ACL = true; + private static final boolean DEFAULT_DISABLE_ACL = true; private S3Client _s3Client; - private boolean disableAcl = DEFAULT_DISABLE_ACL; + private boolean _disableAcl = DEFAULT_DISABLE_ACL; + private ServerSideEncryption _serverSideEncryption = null; + private String _ssekmsKeyId; + private String _ssekmsEncryptionContext; @Override public void init(PinotConfiguration config) { Preconditions.checkArgument(!isNullOrEmpty(config.getProperty(REGION))); String region = config.getProperty(REGION); - disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, DEFAULT_DISABLE_ACL); + _disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, DEFAULT_DISABLE_ACL); + String serverSideEncryption = config.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY); + if (serverSideEncryption != null) { + try { + _serverSideEncryption = ServerSideEncryption.valueOf(serverSideEncryption); + } catch (Exception e) { + throw new UnsupportedOperationException(String + .format("Unknown value '%s' for S3PinotFS config: 'serverSideEncryption'. Supported values are: %s", + serverSideEncryption, Arrays.toString(ServerSideEncryption.knownValues().toArray()))); + } + switch (_serverSideEncryption) { + case AWS_KMS: + _ssekmsKeyId = config.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY); + if (_ssekmsKeyId == null) { + throw new UnsupportedOperationException( + "Missing required config: 'sseKmsKeyId' when AWS_KMS is used for server side encryption"); + } + _ssekmsEncryptionContext = config.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY); + break; + case AES256: + // Todo: Support AES256. + default: + throw new UnsupportedOperationException("Unsupported server side encryption: " + _serverSideEncryption); + } + } AwsCredentialsProvider awsCredentialsProvider; - try { + try { if (!isNullOrEmpty(config.getProperty(ACCESS_KEY)) && !isNullOrEmpty(config.getProperty(SECRET_KEY))) { String accessKey = config.getProperty(ACCESS_KEY); String secretKey = config.getProperty(SECRET_KEY); @@ -99,7 +131,8 @@ public class S3PinotFS extends PinotFS { awsCredentialsProvider = DefaultCredentialsProvider.create(); } - S3ClientBuilder s3ClientBuilder = S3Client.builder().region(Region.of(region)).credentialsProvider(awsCredentialsProvider); + S3ClientBuilder s3ClientBuilder = + S3Client.builder().region(Region.of(region)).credentialsProvider(awsCredentialsProvider); if (!isNullOrEmpty(config.getProperty(ENDPOINT))) { String endpoint = config.getProperty(ENDPOINT); try { @@ -248,14 +281,7 @@ public class S3PinotFS extends PinotFS { } String dstPath = sanitizePath(dstUri.getPath()); - CopyObjectRequest.Builder copyReqBuilder = CopyObjectRequest.builder().copySource(encodedUrl) - .destinationBucket(dstUri.getHost()).destinationKey(dstPath); - - if (!disableAcl) { - copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL); - } - - CopyObjectRequest copyReq = copyReqBuilder.build(); + CopyObjectRequest copyReq = generateCopyObjectRequest(encodedUrl, dstUri, dstPath, null); CopyObjectResponse copyObjectResponse = _s3Client.copyObject(copyReq); return copyObjectResponse.sdkHttpResponse().isSuccessful(); } catch (S3Exception e) { @@ -275,15 +301,8 @@ public class S3PinotFS extends PinotFS { return true; } - PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(uri.getHost()).key(path); - - if (!disableAcl) { - putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL); - } - - PutObjectRequest putObjectRequest = putReqBuilder.build(); + PutObjectRequest putObjectRequest = generatePutObjectRequest(uri, path); PutObjectResponse putObjectResponse = _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0])); - return putObjectResponse.sdkHttpResponse().isSuccessful(); } catch (Throwable t) { throw new IOException(t); @@ -477,13 +496,7 @@ public class S3PinotFS extends PinotFS { LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), dstUri); URI base = getBase(dstUri); String prefix = sanitizePath(base.relativize(dstUri).getPath()); - PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(dstUri.getHost()).key(prefix); - - if (!disableAcl) { - putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL); - } - - PutObjectRequest putObjectRequest = putReqBuilder.build(); + PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri, prefix); _s3Client.putObject(putObjectRequest, srcFile.toPath()); } @@ -496,9 +509,8 @@ public class S3PinotFS extends PinotFS { return true; } - ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request - .builder().bucket(uri.getHost()) - .prefix(prefix).maxKeys(2).build(); + ListObjectsV2Request listObjectsV2Request = + ListObjectsV2Request.builder().bucket(uri.getHost()).prefix(prefix).maxKeys(2).build(); ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request); return listObjectsV2Response.hasContents(); } catch (NoSuchKeyException e) { @@ -526,29 +538,14 @@ public class S3PinotFS extends PinotFS { } String path = sanitizePath(uri.getPath()); - Map<String, String> mp = new HashMap<>(); - mp.put("lastModified", String.valueOf(System.currentTimeMillis())); - CopyObjectRequest.Builder copyReqBuilder = CopyObjectRequest.builder().copySource(encodedUrl) - .destinationBucket(uri.getHost()).destinationKey(path) - .metadata(mp).metadataDirective(MetadataDirective.REPLACE); - - if (!disableAcl) { - copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL); - } - - CopyObjectRequest request = copyReqBuilder.build(); + CopyObjectRequest request = generateCopyObjectRequest(encodedUrl, uri, path, + ImmutableMap.of("lastModified", String.valueOf(System.currentTimeMillis()))); _s3Client.copyObject(request); long newUpdateTime = getS3ObjectMetadata(uri).lastModified().toEpochMilli(); return newUpdateTime > s3ObjectMetadata.lastModified().toEpochMilli(); } catch (NoSuchKeyException e) { String path = sanitizePath(uri.getPath()); - PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(uri.getHost()).key(path); - - if (!disableAcl) { - putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL); - } - - PutObjectRequest putObjectRequest = putReqBuilder.build(); + PutObjectRequest putObjectRequest = generatePutObjectRequest(uri, path); _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0])); return true; } catch (S3Exception e) { @@ -556,6 +553,41 @@ public class S3PinotFS extends PinotFS { } } + private PutObjectRequest generatePutObjectRequest(URI uri, String path) { + PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(uri.getHost()).key(path); + + if (!_disableAcl) { + putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL); + } + + if (_serverSideEncryption != null) { + putReqBuilder.serverSideEncryption(_serverSideEncryption).ssekmsKeyId(_ssekmsKeyId); + if (_ssekmsEncryptionContext != null) { + putReqBuilder.ssekmsEncryptionContext(_ssekmsEncryptionContext); + } + } + return putReqBuilder.build(); + } + + private CopyObjectRequest generateCopyObjectRequest(String copySource, URI dest, String path, + Map<String, String> metadata) { + CopyObjectRequest.Builder copyReqBuilder = + CopyObjectRequest.builder().copySource(copySource).destinationBucket(dest.getHost()).destinationKey(path); + if (metadata != null) { + copyReqBuilder.metadata(metadata).metadataDirective(MetadataDirective.REPLACE); + } + if (!_disableAcl) { + copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL); + } + if (_serverSideEncryption != null) { + copyReqBuilder.serverSideEncryption(_serverSideEncryption).ssekmsKeyId(_ssekmsKeyId); + if (_ssekmsEncryptionContext != null) { + copyReqBuilder.ssekmsEncryptionContext(_ssekmsEncryptionContext); + } + } + return copyReqBuilder.build(); + } + @Override public InputStream open(URI uri) throws IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org