This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a6cf197bfa support Amazon S3 storage class (#13613)
a6cf197bfa is described below

commit a6cf197bfa51bc6b5653162d3630c1565ae5b8fa
Author: sullis <git...@seansullivan.com>
AuthorDate: Fri Jul 26 23:40:22 2024 -0700

    support Amazon S3 storage class (#13613)
---
 .../apache/pinot/plugin/filesystem/S3Config.java   | 18 +++++++
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 34 ++++++++++++-
 .../pinot/plugin/filesystem/S3ConfigTest.java      | 23 +++++++++
 .../pinot/plugin/filesystem/S3PinotFSTest.java     | 56 ++++++++++++++++------
 .../pinot/plugin/filesystem/S3TestUtils.java       | 10 +++-
 5 files changed, 124 insertions(+), 17 deletions(-)

diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
index 3870485f03..ec70b63510 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
@@ -23,12 +23,14 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import java.time.Duration;
 import java.util.UUID;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.services.s3.model.StorageClass;
 
 
 /**
@@ -49,6 +51,8 @@ public class S3Config {
   public static final String ENDPOINT = "endpoint";
   public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl";
 
+  public static final String STORAGE_CLASS = "storageClass";
+
   // Encryption related configurations
   public static final String SERVER_SIDE_ENCRYPTION_CONFIG_KEY = 
"serverSideEncryption";
   public static final String SSE_KMS_KEY_ID_CONFIG_KEY = "ssekmsKeyId";
@@ -76,6 +80,7 @@ public class S3Config {
   private final String _accessKey;
   private final String _secretKey;
   private final String _region;
+  private final String _storageClass;
   private final boolean _disableAcl;
   private final String _endpoint;
 
@@ -100,6 +105,14 @@ public class S3Config {
     _region = pinotConfig.getProperty(REGION);
     _endpoint = pinotConfig.getProperty(ENDPOINT);
 
+    _storageClass = pinotConfig.getProperty(STORAGE_CLASS);
+    if (_storageClass != null) {
+      if (StorageClass.fromValue(_storageClass) == 
StorageClass.UNKNOWN_TO_SDK_VERSION) {
+        throw new IllegalStateException(
+            "unknown s3 storage class: " + _storageClass + " - Valid storage 
classes: " + StorageClass.knownValues());
+      }
+    }
+
     _serverSideEncryption = 
pinotConfig.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY);
     _ssekmsKeyId = pinotConfig.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
     _ssekmsEncryptionContext = 
pinotConfig.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
@@ -247,4 +260,9 @@ public class S3Config {
   public ApacheHttpClient.Builder getHttpClientBuilder() {
     return _httpClientBuilder;
   }
+
+  @Nullable
+  public String getStorageClass() {
+    return _storageClass;
+  }
 }
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index 4fc84f3541..d1129156a9 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -77,6 +77,7 @@ import 
software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.S3Object;
 import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+import software.amazon.awssdk.services.s3.model.StorageClass;
 import software.amazon.awssdk.services.s3.model.UploadPartRequest;
 import software.amazon.awssdk.services.s3.model.UploadPartResponse;
 import software.amazon.awssdk.services.sts.StsClient;
@@ -99,6 +100,7 @@ public class S3PinotFS extends BasePinotFS {
   private String _ssekmsEncryptionContext;
   private long _minObjectSizeToUploadInParts;
   private long _multiPartUploadPartSize;
+  private @Nullable StorageClass _storageClass;
 
   @Override
   public void init(PinotConfiguration config) {
@@ -149,6 +151,12 @@ public class S3PinotFS extends BasePinotFS {
       if (s3Config.getHttpClientBuilder() != null) {
         s3ClientBuilder.httpClientBuilder(s3Config.getHttpClientBuilder());
       }
+
+      if (s3Config.getStorageClass() != null) {
+        _storageClass = StorageClass.fromValue(s3Config.getStorageClass());
+        assert (_storageClass != StorageClass.UNKNOWN_TO_SDK_VERSION);
+      }
+
       _s3Client = s3ClientBuilder.build();
       setMultiPartUploadConfigs(s3Config);
     } catch (S3Exception e) {
@@ -180,6 +188,17 @@ public class S3PinotFS extends BasePinotFS {
     setDisableAcl(s3Config);
   }
 
+  @VisibleForTesting
+  void setStorageClass(@Nullable StorageClass storageClass) {
+    _storageClass = storageClass;
+  }
+
+  @VisibleForTesting
+  @Nullable
+  StorageClass getStorageClass() {
+    return _storageClass;
+  }
+
   private void setServerSideEncryption(@Nullable String serverSideEncryption, 
S3Config s3Config) {
     if (serverSideEncryption != null) {
       try {
@@ -581,8 +600,13 @@ public class S3PinotFS extends BasePinotFS {
       throws Exception {
     String bucket = dstUri.getHost();
     String prefix = sanitizePath(getBase(dstUri).relativize(dstUri).getPath());
+    CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = 
CreateMultipartUploadRequest.builder();
+    createMultipartUploadRequestBuilder.bucket(bucket).key(prefix);
+    if (_storageClass != null) {
+      createMultipartUploadRequestBuilder.storageClass(_storageClass);
+    }
     CreateMultipartUploadResponse multipartUpload =
-        
_s3Client.createMultipartUpload(CreateMultipartUploadRequest.builder().bucket(bucket).key(prefix).build());
+        
_s3Client.createMultipartUpload(createMultipartUploadRequestBuilder.build());
     String uploadId = multipartUpload.uploadId();
     // Upload parts sequentially to overcome the 5GB limit of a single 
PutObject call.
     // TODO: parts can be uploaded in parallel for higher throughput, given a 
thread pool.
@@ -699,6 +723,11 @@ public class S3PinotFS extends BasePinotFS {
         putReqBuilder.ssekmsEncryptionContext(_ssekmsEncryptionContext);
       }
     }
+
+    if (_storageClass != null) {
+      putReqBuilder.storageClass(_storageClass);
+    }
+
     return putReqBuilder.build();
   }
 
@@ -706,6 +735,9 @@ public class S3PinotFS extends BasePinotFS {
       Map<String, String> metadata) {
     CopyObjectRequest.Builder copyReqBuilder =
         
CopyObjectRequest.builder().copySource(copySource).destinationBucket(dest.getHost()).destinationKey(path);
+    if (_storageClass != null) {
+      copyReqBuilder.storageClass(_storageClass);
+    }
     if (metadata != null) {
       
copyReqBuilder.metadata(metadata).metadataDirective(MetadataDirective.REPLACE);
     }
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3ConfigTest.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3ConfigTest.java
index 2df60baa9f..6d51f11ff3 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3ConfigTest.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3ConfigTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.plugin.filesystem;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import software.amazon.awssdk.services.s3.model.StorageClass;
 
 
 public class S3ConfigTest {
@@ -44,4 +45,26 @@ public class S3ConfigTest {
     Assert.assertEquals(S3Config.parseDuration("P1DT2H30S"), 
S3Config.parseDuration("1d2h30s"));
     S3Config.parseDuration("10");
   }
+
+  @Test
+  public void testDefaultStorageClassIsNull() {
+    PinotConfiguration pinotConfig = new PinotConfiguration();
+    S3Config cfg = new S3Config(pinotConfig);
+    Assert.assertNull(cfg.getStorageClass());
+  }
+
+  @Test
+  public void testIntelligentTieringStorageClass() {
+    PinotConfiguration pinotConfig = new PinotConfiguration();
+    pinotConfig.setProperty("storageClass", 
StorageClass.INTELLIGENT_TIERING.toString());
+    S3Config cfg = new S3Config(pinotConfig);
+    Assert.assertEquals(cfg.getStorageClass(), "INTELLIGENT_TIERING");
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void testInvalidStorageClass() {
+    PinotConfiguration pinotConfig = new PinotConfiguration();
+    pinotConfig.setProperty("storageClass", "invalid-storage-class");
+    S3Config cfg = new S3Config(pinotConfig);
+  }
 }
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
index 34edc0a644..18ca80f004 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
@@ -36,6 +36,8 @@ import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -47,6 +49,7 @@ import 
software.amazon.awssdk.services.s3.model.CreateBucketRequest;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.StorageClass;
 
 
 @Test
@@ -83,9 +86,14 @@ public class S3PinotFSTest {
     FileUtils.deleteQuietly(TEMP_FILE);
   }
 
+  @BeforeMethod
+  public void beforeMethod() {
+    _s3PinotFS.setStorageClass(null);
+  }
+
   private void createEmptyFile(String folderName, String fileName) {
     String fileNameWithFolder = folderName.length() == 0 ? fileName : 
folderName + DELIMITER + fileName;
-    _s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, 
fileNameWithFolder),
+    _s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, 
fileNameWithFolder, _s3PinotFS.getStorageClass()),
         RequestBody.fromBytes(new byte[0]));
   }
 
@@ -347,9 +355,12 @@ public class S3PinotFSTest {
     Assert.assertFalse(fileNotExists);
   }
 
-  @Test
-  public void testCopyFromAndToLocal()
+  @Test(dataProvider = "storageClasses")
+  public void testCopyFromAndToLocal(StorageClass storageClass)
       throws Exception {
+
+    _s3PinotFS.setStorageClass(storageClass);
+
     String fileName = "copyFile.txt";
     File fileToCopy = new File(TEMP_FILE, fileName);
     File fileToDownload = new File(TEMP_FILE, 
"copyFile_download.txt").getAbsoluteFile();
@@ -366,9 +377,12 @@ public class S3PinotFSTest {
     }
   }
 
-  @Test
-  public void testMultiPartUpload()
+  @Test(dataProvider = "storageClasses")
+  public void testMultiPartUpload(StorageClass storageClass)
       throws Exception {
+
+    _s3PinotFS.setStorageClass(storageClass);
+
     String fileName = "copyFile_for_multipart.txt";
     File fileToCopy = new File(TEMP_FILE, fileName);
     File fileToDownload = new File(TEMP_FILE, 
"copyFile_download_multipart.txt").getAbsoluteFile();
@@ -399,7 +413,9 @@ public class S3PinotFSTest {
     String fileName = "sample.txt";
     String fileContent = "Hello, World";
 
-    _s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileName), 
RequestBody.fromString(fileContent));
+    _s3Client.putObject(
+        S3TestUtils.getPutObjectRequest(BUCKET, fileName, 
_s3PinotFS.getStorageClass()),
+        RequestBody.fromString(fileContent));
 
     InputStream is = _s3PinotFS.open(URI.create(String.format(FILE_FORMAT, 
SCHEME, BUCKET, fileName)));
     String actualContents = IOUtils.toString(is, StandardCharsets.UTF_8);
@@ -418,25 +434,28 @@ public class S3PinotFSTest {
     Assert.assertTrue(headObjectResponse.sdkHttpResponse().isSuccessful());
   }
 
-  @Test
-  public void testMoveFile()
+  @Test(dataProvider = "storageClasses")
+  public void testMoveFile(StorageClass storageClass)
       throws Exception {
 
-    String fileName = "file-to-move";
+    _s3PinotFS.setStorageClass(storageClass);
+
+    String sourceFilename = "source-file-" + System.currentTimeMillis();
+    String targetFilename = "target-file-" + System.currentTimeMillis();
     int fileSize = 5000;
 
-    File file = new File(TEMP_FILE, fileName);
+    File file = new File(TEMP_FILE, sourceFilename);
 
     try {
       createDummyFile(file, fileSize);
-      URI sourceUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, 
fileName));
+      URI sourceUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, 
sourceFilename));
 
       _s3PinotFS.copyFromLocalFile(file, sourceUri);
 
       HeadObjectResponse sourceHeadObjectResponse =
-          _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, 
fileName));
+          _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, 
sourceFilename));
 
-      URI targetUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, 
"move-target"));
+      URI targetUri = URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, 
targetFilename));
 
       boolean moveResult = _s3PinotFS.move(sourceUri, targetUri, false);
       Assert.assertTrue(moveResult);
@@ -445,7 +464,7 @@ public class S3PinotFSTest {
       Assert.assertTrue(_s3PinotFS.exists(targetUri));
 
       HeadObjectResponse targetHeadObjectResponse =
-          _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, 
"move-target"));
+          _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, 
targetFilename));
       Assert.assertEquals(targetHeadObjectResponse.contentLength(),
           fileSize);
       Assert.assertEquals(targetHeadObjectResponse.storageClass(),
@@ -469,6 +488,15 @@ public class S3PinotFSTest {
     }
   }
 
+  @DataProvider(name = "storageClasses")
+  public Object[][] createStorageClasses() {
+    return new Object[][] {
+      { null },
+      { StorageClass.STANDARD },
+      { StorageClass.INTELLIGENT_TIERING }
+    };
+  }
+
   private static void createDummyFile(File file, int size)
       throws IOException {
     FileUtils.deleteQuietly(file);
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3TestUtils.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3TestUtils.java
index e7938de109..05a42f8a5f 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3TestUtils.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3TestUtils.java
@@ -18,17 +18,23 @@
  */
 package org.apache.pinot.plugin.filesystem;
 
+import javax.annotation.Nullable;
 import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.StorageClass;
 
 
 public class S3TestUtils {
   private S3TestUtils() {
   }
 
-  public static PutObjectRequest getPutObjectRequest(String bucket, String 
key) {
-    return PutObjectRequest.builder().bucket(bucket).key(key).build();
+  public static PutObjectRequest getPutObjectRequest(String bucket, String 
key, @Nullable StorageClass storageClass) {
+    PutObjectRequest.Builder builder = 
PutObjectRequest.builder().bucket(bucket).key(key);
+    if (storageClass != null) {
+      builder.storageClass(storageClass);
+    }
+    return builder.build();
   }
 
   public static HeadObjectRequest getHeadObjectRequest(String bucket, String 
key) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to