This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2d0cbc19de Ability to initialize S3PinotFs with serverSideEncryption properties when passing client directly (#9988) 2d0cbc19de is described below commit 2d0cbc19dec47d3aa9228abd85ffba3a9abe821a Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Tue Dec 13 22:55:24 2022 -0800 Ability to initialize S3PinotFs with serverSideEncryption properties when passing client directly (#9988) * Initialize S3PinotFs serverSideEncryption properties when using init method that takes S3Client directly * Remve unused --- .../apache/pinot/plugin/filesystem/S3PinotFS.java | 79 ++++++++++++++-------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java index 511f0c50b1..99cf60a89c 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.function.Consumer; +import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.BasePinotFS; @@ -98,31 +99,9 @@ public class S3PinotFS extends BasePinotFS { String region = config.getProperty(REGION); _disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, DEFAULT_DISABLE_ACL); String serverSideEncryption = config.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY); - if (serverSideEncryption != null) { - try { - _serverSideEncryption = ServerSideEncryption.valueOf(serverSideEncryption); - } catch (Exception e) { - throw new UnsupportedOperationException(String - .format("Unknown value '%s' for S3PinotFS config: 'serverSideEncryption'. Supported values are: %s", - serverSideEncryption, Arrays.toString(ServerSideEncryption.knownValues().toArray()))); - } - switch (_serverSideEncryption) { - case AWS_KMS: - _ssekmsKeyId = config.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY); - if (_ssekmsKeyId == null) { - throw new UnsupportedOperationException( - "Missing required config: 'sseKmsKeyId' when AWS_KMS is used for server side encryption"); - } - _ssekmsEncryptionContext = config.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY); - break; - case AES256: - // Todo: Support AES256. - default: - throw new UnsupportedOperationException("Unsupported server side encryption: " + _serverSideEncryption); - } - } - AwsCredentialsProvider awsCredentialsProvider; + setServerSideEncryption(serverSideEncryption, config); + AwsCredentialsProvider awsCredentialsProvider; try { if (!isNullOrEmpty(config.getProperty(ACCESS_KEY)) && !isNullOrEmpty(config.getProperty(SECRET_KEY))) { String accessKey = config.getProperty(ACCESS_KEY); @@ -149,10 +128,53 @@ public class S3PinotFS extends BasePinotFS { } } + /** + * Initialized the _s3Client directly with provided client. + * This initialization method will not initialize the server side encryption + * @param s3Client s3Client to initialize with + */ public void init(S3Client s3Client) { _s3Client = s3Client; } + /** + * Initialize the _s3Client directly with provided client, along with additional server side encryption related props + * @param s3Client s3Client to initialize with + * @param serverSideEncryption the server side encryption string e.g. AWS_KMS is the only supported on as of now + * @param serverSideEncryptionConfig properties specific to provided server side encryption type + */ + public void init(S3Client s3Client, String serverSideEncryption, PinotConfiguration serverSideEncryptionConfig) { + _s3Client = s3Client; + setServerSideEncryption(serverSideEncryption, serverSideEncryptionConfig); + } + + private void setServerSideEncryption(@Nullable String serverSideEncryption, + PinotConfiguration serverSideEncryptionConfig) { + if (serverSideEncryption != null) { + try { + _serverSideEncryption = ServerSideEncryption.valueOf(serverSideEncryption); + } catch (Exception e) { + throw new UnsupportedOperationException( + String.format("Unknown value '%s' for S3PinotFS config: 'serverSideEncryption'. Supported values are: %s", + serverSideEncryption, Arrays.toString(ServerSideEncryption.knownValues().toArray()))); + } + switch (_serverSideEncryption) { + case AWS_KMS: + _ssekmsKeyId = serverSideEncryptionConfig.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY); + if (_ssekmsKeyId == null) { + throw new UnsupportedOperationException( + "Missing required config: 'sseKmsKeyId' when AWS_KMS is used for server side encryption"); + } + _ssekmsEncryptionContext = serverSideEncryptionConfig.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY); + break; + case AES256: + // Todo: Support AES256. + default: + throw new UnsupportedOperationException("Unsupported server side encryption: " + _serverSideEncryption); + } + } + } + boolean isNullOrEmpty(String target) { return target == null || target.isEmpty(); } @@ -318,9 +340,8 @@ public class S3PinotFS extends BasePinotFS { try { if (isDirectory(segmentUri)) { if (!forceDelete) { - Preconditions - .checkState(isEmptyDirectory(segmentUri), "ForceDelete flag is not set and directory '%s' is not empty", - segmentUri); + Preconditions.checkState(isEmptyDirectory(segmentUri), + "ForceDelete flag is not set and directory '%s' is not empty", segmentUri); } String prefix = normalizeToDirectoryPrefix(segmentUri); ListObjectsV2Response listObjectsV2Response; @@ -456,8 +477,8 @@ public class S3PinotFS extends BasePinotFS { ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder(); visitFiles(fileUri, recursive, s3Object -> { if (!s3Object.key().equals(fileUri.getPath())) { - FileMetadata.Builder fileBuilder = new FileMetadata.Builder() - .setFilePath(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object)) + FileMetadata.Builder fileBuilder = new FileMetadata.Builder().setFilePath( + S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object)) .setLastModifiedTime(s3Object.lastModified().toEpochMilli()).setLength(s3Object.size()) .setIsDirectory(s3Object.key().endsWith(DELIMITER)); listBuilder.add(fileBuilder.build()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org