This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 662be934a56 [Enhancement] Support refresh AWS credentials (#16553)
662be934a56 is described below

commit 662be934a568301a0acc409dda63f78f75e3210e
Author: Hongkun Xu <[email protected]>
AuthorDate: Mon Aug 11 08:21:35 2025 +0800

    [Enhancement] Support refresh AWS credentials (#16553)
    
    * support to refresh AWS credential
    
    Signed-off-by: Hongkun Xu <[email protected]>
    
    * Apply suggestion from @Copilot
    
    Co-authored-by: Copilot <[email protected]>
    
    ---------
    
    Signed-off-by: Hongkun Xu <[email protected]>
    Co-authored-by: Xiang Fu <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 208 +++++++++++++++------
 1 file changed, 149 insertions(+), 59 deletions(-)

diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index 64c6c27331c..6146ce75c14 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -47,6 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -107,6 +109,7 @@ public class S3PinotFS extends BasePinotFS {
   public static final int DELETE_BATCH_SIZE = 1000;
 
   private S3Client _s3Client;
+  private S3Config _s3Config;
   private boolean _disableAcl;
   private ServerSideEncryption _serverSideEncryption = null;
   private String _ssekmsKeyId;
@@ -117,78 +120,159 @@ public class S3PinotFS extends BasePinotFS {
 
   @Override
   public void init(PinotConfiguration config) {
-    S3Config s3Config = new S3Config(config);
-    Preconditions.checkArgument(StringUtils.isNotEmpty(s3Config.getRegion()), 
"Region can't be null or empty");
+    _s3Config = new S3Config(config);
+    initOrRefreshS3Client();
+  }
 
-    _disableAcl = s3Config.getDisableAcl();
-    setServerSideEncryption(s3Config.getServerSideEncryption(), s3Config);
+  public void initOrRefreshS3Client() {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(_s3Config.getRegion()), 
"Region can't be null or empty");
+
+    _disableAcl = _s3Config.getDisableAcl();
+    setServerSideEncryption(_s3Config.getServerSideEncryption(), _s3Config);
 
     AwsCredentialsProvider awsCredentialsProvider;
     try {
-      if (StringUtils.isNotEmpty(s3Config.getAccessKey()) && 
StringUtils.isNotEmpty(s3Config.getSecretKey())) {
+      if (StringUtils.isNotEmpty(_s3Config.getAccessKey()) && 
StringUtils.isNotEmpty(_s3Config.getSecretKey())) {
         AwsBasicCredentials awsBasicCredentials =
-            AwsBasicCredentials.create(s3Config.getAccessKey(), 
s3Config.getSecretKey());
+            AwsBasicCredentials.create(_s3Config.getAccessKey(), 
_s3Config.getSecretKey());
         awsCredentialsProvider = 
StaticCredentialsProvider.create(awsBasicCredentials);
-      } else if (s3Config.isAnonymousCredentialsProvider()) {
+      } else if (_s3Config.isAnonymousCredentialsProvider()) {
         awsCredentialsProvider = AnonymousCredentialsProvider.create();
       } else {
         awsCredentialsProvider = DefaultCredentialsProvider.builder().build();
       }
 
       // IAM Role based access
-      if (s3Config.isIamRoleBasedAccess()) {
+      if (_s3Config.isIamRoleBasedAccess()) {
         AssumeRoleRequest.Builder assumeRoleRequestBuilder =
-            
AssumeRoleRequest.builder().roleArn(s3Config.getRoleArn()).roleSessionName(s3Config.getRoleSessionName())
-                .durationSeconds(s3Config.getSessionDurationSeconds());
+            
AssumeRoleRequest.builder().roleArn(_s3Config.getRoleArn()).roleSessionName(_s3Config.getRoleSessionName())
+                .durationSeconds(_s3Config.getSessionDurationSeconds());
         AssumeRoleRequest assumeRoleRequest;
-        if (StringUtils.isNotEmpty(s3Config.getExternalId())) {
-          assumeRoleRequest = 
assumeRoleRequestBuilder.externalId(s3Config.getExternalId()).build();
+        if (StringUtils.isNotEmpty(_s3Config.getExternalId())) {
+          assumeRoleRequest = 
assumeRoleRequestBuilder.externalId(_s3Config.getExternalId()).build();
         } else {
           assumeRoleRequest = assumeRoleRequestBuilder.build();
         }
         StsClient stsClient =
-            
StsClient.builder().region(Region.of(s3Config.getRegion())).credentialsProvider(awsCredentialsProvider)
+            
StsClient.builder().region(Region.of(_s3Config.getRegion())).credentialsProvider(awsCredentialsProvider)
                 .build();
         awsCredentialsProvider =
             
StsAssumeRoleCredentialsProvider.builder().stsClient(stsClient).refreshRequest(assumeRoleRequest)
-                
.asyncCredentialUpdateEnabled(s3Config.isAsyncSessionUpdateEnabled()).build();
+                
.asyncCredentialUpdateEnabled(_s3Config.isAsyncSessionUpdateEnabled()).build();
       }
 
-      S3ClientBuilder s3ClientBuilder = 
S3Client.builder().forcePathStyle(true).region(Region.of(s3Config.getRegion()))
-          
.credentialsProvider(awsCredentialsProvider).crossRegionAccessEnabled(s3Config.isCrossRegionAccessEnabled());
-      if (StringUtils.isNotEmpty(s3Config.getEndpoint())) {
+      S3ClientBuilder s3ClientBuilder = 
S3Client.builder().forcePathStyle(true).region(Region.of(_s3Config.getRegion()))
+          
.credentialsProvider(awsCredentialsProvider).crossRegionAccessEnabled(_s3Config.isCrossRegionAccessEnabled());
+      if (StringUtils.isNotEmpty(_s3Config.getEndpoint())) {
         try {
-          s3ClientBuilder.endpointOverride(new URI(s3Config.getEndpoint()));
+          s3ClientBuilder.endpointOverride(new URI(_s3Config.getEndpoint()));
         } catch (URISyntaxException e) {
           throw new RuntimeException(e);
         }
       }
-      if (s3Config.getHttpClientBuilder() != null) {
-        s3ClientBuilder.httpClientBuilder(s3Config.getHttpClientBuilder());
+      if (_s3Config.getHttpClientBuilder() != null) {
+        s3ClientBuilder.httpClientBuilder(_s3Config.getHttpClientBuilder());
       }
 
-      if (s3Config.getStorageClass() != null) {
-        _storageClass = StorageClass.fromValue(s3Config.getStorageClass());
+      if (_s3Config.getStorageClass() != null) {
+        _storageClass = StorageClass.fromValue(_s3Config.getStorageClass());
         assert (_storageClass != StorageClass.UNKNOWN_TO_SDK_VERSION);
       }
 
-      if (s3Config.getRequestChecksumCalculationWhenRequired() == 
RequestChecksumCalculation.WHEN_REQUIRED) {
+      if (_s3Config.getRequestChecksumCalculationWhenRequired() == 
RequestChecksumCalculation.WHEN_REQUIRED) {
         
s3ClientBuilder.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED);
       }
-      if (s3Config.getResponseChecksumValidationWhenRequired() == 
ResponseChecksumValidation.WHEN_REQUIRED) {
+      if (_s3Config.getResponseChecksumValidationWhenRequired() == 
ResponseChecksumValidation.WHEN_REQUIRED) {
         
s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED);
       }
-      if (s3Config.useLegacyMd5Plugin()) {
+      if (_s3Config.useLegacyMd5Plugin()) {
         s3ClientBuilder.addPlugin(LegacyMd5Plugin.create());
       }
 
       _s3Client = s3ClientBuilder.build();
-      setMultiPartUploadConfigs(s3Config);
+      setMultiPartUploadConfigs(_s3Config);
     } catch (S3Exception e) {
       throw new RuntimeException("Could not initialize S3PinotFS", e);
     }
   }
 
+  /**
+   * Masks a sensitive key, showing only the first and last 3 characters, with 
the middle characters replaced by '*'.
+   * If the key is null or shorter than or equal to 6 characters, returns 
"***".
+   *
+   * @param key the sensitive key string to mask
+   * @return the masked key string
+   */
+  private static String maskKey(String key) {
+    if (key == null || key.length() <= 6) {
+      return "***";
+    }
+    int maskLength = key.length() - 6;
+    StringBuilder sb = new StringBuilder();
+    sb.append(key, 0, 3);
+    for (int i = 0; i < maskLength; i++) {
+      sb.append("*");
+    }
+    sb.append(key, key.length() - 3, key.length());
+    return sb.toString();
+  }
+
+  private void logAwsCredentials(String when) {
+    AwsCredentials credentials = getAwsCredentials();
+    if (credentials != null) {
+      LOGGER.warn("S3 credentials {} - Access Key: {}", when, 
maskKey(credentials.accessKeyId()));
+      LOGGER.warn("S3 credentials {} - Secret Key: {}", when, 
maskKey(credentials.secretAccessKey()));
+    } else {
+      LOGGER.warn("S3 credentials {} - Unable to retrieve AWS credentials 
(access key & secret key unavailable)", when);
+    }
+  }
+
+  /**
+   * Retrieves the AWS credentials from the current S3 client's credentials 
provider.
+   *
+   * @return the resolved {@link AwsCredentials}
+   * @throws IllegalStateException if the S3 client credentials provider is 
not an {@link AwsCredentialsProvider}
+   */
+  public AwsCredentials getAwsCredentials() {
+    Object provider = 
_s3Client.serviceClientConfiguration().credentialsProvider();
+    if (provider instanceof AwsCredentialsProvider) {
+      return ((AwsCredentialsProvider) provider).resolveCredentials();
+    } else {
+      throw new IllegalStateException("S3 client credentialsProvider is not an 
AwsCredentialsProvider: "
+          + (provider != null ? provider.getClass().getName() : "null"));
+    }
+  }
+
+  /**
+   * Executes the given S3 operation, retrying once after refreshing AWS 
credentials if an {@link S3Exception} occurs.
+   *
+   * @param action the S3 operation to execute
+   * @param <T> the type of the result returned by the operation
+   * @return the result of the S3 operation
+   * @throws IOException if the operation fails after credential refresh
+   */
+  private <T> T retryWithS3CredentialRefresh(Supplier<T> action) throws 
IOException {
+    try {
+      return action.get();
+    } catch (S3Exception e) {
+      int statusCode = e.statusCode();
+      // Only attempt credential refresh and retry for S3Exception with 
401/403 status code
+      if (statusCode == 401 || statusCode == 403) {
+        LOGGER.warn("Caught S3 authentication/authorization exception ({}), "
+            + "attempting to refresh credentials and retry", statusCode);
+        logAwsCredentials("BEFORE refresh");
+        initOrRefreshS3Client();
+        logAwsCredentials("AFTER refresh");
+        try {
+          return action.get();
+        } catch (Exception retryException) {
+          throw new IOException("Unexpected exception during S3 operation 
after credential refresh", retryException);
+        }
+      }
+      throw e;
+    }
+  }
+
   /**
    * Initialized the _s3Client directly with provided client.
    * This initialization method will not initialize the server side encryption
@@ -250,13 +334,14 @@ public class S3PinotFS extends BasePinotFS {
     }
   }
 
-  private HeadObjectResponse getS3ObjectMetadata(URI uri)
-      throws IOException {
+  private HeadObjectResponse getS3ObjectMetadata(URI uri) throws IOException {
     URI base = getBase(uri);
     String path = sanitizePath(base.relativize(uri).getPath());
-    HeadObjectRequest headObjectRequest = 
HeadObjectRequest.builder().bucket(uri.getHost()).key(path).build();
-
-    return _s3Client.headObject(headObjectRequest);
+    HeadObjectRequest headObjectRequest = HeadObjectRequest.builder()
+        .bucket(uri.getHost())
+        .key(path)
+        .build();
+    return retryWithS3CredentialRefresh(() -> 
_s3Client.headObject(headObjectRequest));
   }
 
   private boolean isPathTerminatedByDelimiter(URI uri) {
@@ -315,7 +400,7 @@ public class S3PinotFS extends BasePinotFS {
       String path = sanitizePath(base.relativize(uri).getPath());
       HeadObjectRequest headObjectRequest = 
HeadObjectRequest.builder().bucket(uri.getHost()).key(path).build();
 
-      _s3Client.headObject(headObjectRequest);
+      retryWithS3CredentialRefresh(() -> 
_s3Client.headObject(headObjectRequest));
       return true;
     } catch (NoSuchKeyException e) {
       return false;
@@ -345,7 +430,7 @@ public class S3PinotFS extends BasePinotFS {
     }
 
     ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.build();
-    listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request);
+    listObjectsV2Response = retryWithS3CredentialRefresh(() -> 
_s3Client.listObjectsV2(listObjectsV2Request));
 
     for (S3Object s3Object : listObjectsV2Response.contents()) {
       if (s3Object.key().equals(prefix)) {
@@ -372,7 +457,7 @@ public class S3PinotFS extends BasePinotFS {
 
       String dstPath = sanitizePath(dstUri.getPath());
       CopyObjectRequest copyReq = generateCopyObjectRequest(encodedUrl, 
dstUri, dstPath, null);
-      CopyObjectResponse copyObjectResponse = _s3Client.copyObject(copyReq);
+      CopyObjectResponse copyObjectResponse = retryWithS3CredentialRefresh(() 
-> _s3Client.copyObject(copyReq));
       return copyObjectResponse.sdkHttpResponse().isSuccessful();
     } catch (S3Exception e) {
       throw new IOException(e);
@@ -392,7 +477,8 @@ public class S3PinotFS extends BasePinotFS {
       }
 
       PutObjectRequest putObjectRequest = generatePutObjectRequest(uri, path);
-      PutObjectResponse putObjectResponse = 
_s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0]));
+      PutObjectResponse putObjectResponse = retryWithS3CredentialRefresh(() ->
+          _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new 
byte[0])));
       return putObjectResponse.sdkHttpResponse().isSuccessful();
     } catch (Throwable t) {
       throw new IOException(t);
@@ -450,14 +536,14 @@ public class S3PinotFS extends BasePinotFS {
     }
   }
 
-  private boolean processBatch(String bucket, List<ObjectIdentifier> 
objectsToDelete) {
+  private boolean processBatch(String bucket, List<ObjectIdentifier> 
objectsToDelete) throws IOException {
     LOGGER.info("Deleting batch of {} objects", objectsToDelete.size());
     DeleteObjectsRequest deleteRequest = DeleteObjectsRequest.builder()
         .bucket(bucket)
         .delete(Delete.builder().objects(objectsToDelete).build())
         .build();
 
-    DeleteObjectsResponse deleteResponse = 
_s3Client.deleteObjects(deleteRequest);
+    DeleteObjectsResponse deleteResponse = retryWithS3CredentialRefresh(() -> 
_s3Client.deleteObjects(deleteRequest));
     LOGGER.info("Failed to delete {} objects", deleteResponse.hasErrors() ? 
deleteResponse.errors().size() : 0);
     return deleteResponse.deleted().size() == objectsToDelete.size();
   }
@@ -490,17 +576,17 @@ public class S3PinotFS extends BasePinotFS {
 
         if (prefix.equals(DELIMITER)) {
           ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.build();
-          listObjectsV2Response = 
_s3Client.listObjectsV2(listObjectsV2Request);
+          listObjectsV2Response = retryWithS3CredentialRefresh(() -> 
_s3Client.listObjectsV2(listObjectsV2Request));
         } else {
           ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.prefix(prefix).build();
-          listObjectsV2Response = 
_s3Client.listObjectsV2(listObjectsV2Request);
+          listObjectsV2Response = retryWithS3CredentialRefresh(() -> 
_s3Client.listObjectsV2(listObjectsV2Request));
         }
         boolean deleteSucceeded = true;
         for (S3Object s3Object : listObjectsV2Response.contents()) {
           DeleteObjectRequest deleteObjectRequest =
               
DeleteObjectRequest.builder().bucket(segmentUri.getHost()).key(s3Object.key()).build();
-
-          DeleteObjectResponse deleteObjectResponse = 
_s3Client.deleteObject(deleteObjectRequest);
+          DeleteObjectResponse deleteObjectResponse = 
retryWithS3CredentialRefresh(() ->
+              _s3Client.deleteObject(deleteObjectRequest));
 
           deleteSucceeded &= 
deleteObjectResponse.sdkHttpResponse().isSuccessful();
         }
@@ -510,7 +596,8 @@ public class S3PinotFS extends BasePinotFS {
         DeleteObjectRequest deleteObjectRequest =
             
DeleteObjectRequest.builder().bucket(segmentUri.getHost()).key(prefix).build();
 
-        DeleteObjectResponse deleteObjectResponse = 
_s3Client.deleteObject(deleteObjectRequest);
+        DeleteObjectResponse deleteObjectResponse = 
retryWithS3CredentialRefresh(() ->
+            _s3Client.deleteObject(deleteObjectRequest));
 
         return deleteObjectResponse.sdkHttpResponse().isSuccessful();
       }
@@ -674,7 +761,8 @@ public class S3PinotFS extends BasePinotFS {
         }
         ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.build();
         LOGGER.debug("Trying to send ListObjectsV2Request {}", 
listObjectsV2Request);
-        ListObjectsV2Response listObjectsV2Response = 
_s3Client.listObjectsV2(listObjectsV2Request);
+        ListObjectsV2Response listObjectsV2Response = 
retryWithS3CredentialRefresh(() ->
+            _s3Client.listObjectsV2(listObjectsV2Request));
         LOGGER.debug("Getting ListObjectsV2Response: {}", 
listObjectsV2Response);
         List<S3Object> filesReturned = listObjectsV2Response.contents();
         filesReturned.forEach(objectVisitor);
@@ -692,14 +780,15 @@ public class S3PinotFS extends BasePinotFS {
 
   @Override
   public void copyToLocalFile(URI srcUri, File dstFile)
-      throws Exception {
-    LOGGER.info("Copy {} to local {}", srcUri, dstFile.getAbsolutePath());
-    URI base = getBase(srcUri);
-    FileUtils.forceMkdir(dstFile.getParentFile());
-    String prefix = sanitizePath(base.relativize(srcUri).getPath());
-    GetObjectRequest getObjectRequest = 
GetObjectRequest.builder().bucket(srcUri.getHost()).key(prefix).build();
+      throws IOException {
+      LOGGER.info("Copy {} to local {}", srcUri, dstFile.getAbsolutePath());
+      URI base = getBase(srcUri);
+      FileUtils.forceMkdir(dstFile.getParentFile());
+      String prefix = sanitizePath(base.relativize(srcUri).getPath());
+      GetObjectRequest getObjectRequest = 
GetObjectRequest.builder().bucket(srcUri.getHost()).key(prefix).build();
 
-    _s3Client.getObject(getObjectRequest, ResponseTransformer.toFile(dstFile));
+      retryWithS3CredentialRefresh(() ->
+          _s3Client.getObject(getObjectRequest, 
ResponseTransformer.toFile(dstFile)));
   }
 
   @Override
@@ -712,7 +801,7 @@ public class S3PinotFS extends BasePinotFS {
       LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), 
dstUri);
       String prefix = 
sanitizePath(getBase(dstUri).relativize(dstUri).getPath());
       PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri, 
prefix);
-      _s3Client.putObject(putObjectRequest, srcFile.toPath());
+      retryWithS3CredentialRefresh(() -> _s3Client.putObject(putObjectRequest, 
srcFile.toPath()));
     }
   }
 
@@ -725,8 +814,8 @@ public class S3PinotFS extends BasePinotFS {
     if (_storageClass != null) {
       createMultipartUploadRequestBuilder.storageClass(_storageClass);
     }
-    CreateMultipartUploadResponse multipartUpload =
-        
_s3Client.createMultipartUpload(createMultipartUploadRequestBuilder.build());
+    CreateMultipartUploadResponse multipartUpload = 
retryWithS3CredentialRefresh(() ->
+        
_s3Client.createMultipartUpload(createMultipartUploadRequestBuilder.build()));
     String uploadId = multipartUpload.uploadId();
     // Upload parts sequentially to overcome the 5GB limit of a single 
PutObject call.
     // TODO: parts can be uploaded in parallel for higher throughput, given a 
thread pool.
@@ -757,9 +846,9 @@ public class S3PinotFS extends BasePinotFS {
         partNum++;
       }
       // complete the multipart upload
-      _s3Client.completeMultipartUpload(
+      retryWithS3CredentialRefresh(() -> _s3Client.completeMultipartUpload(
           
CompleteMultipartUploadRequest.builder().uploadId(uploadId).bucket(bucket).key(prefix)
-              
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build()).build());
+              
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build()).build()));
     } catch (Exception e) {
       LOGGER.error("Failed to upload file {} to {} in parts. Abort upload 
request: {}", srcFile, dstUri, uploadId, e);
       _s3Client.abortMultipartUpload(
@@ -793,7 +882,8 @@ public class S3PinotFS extends BasePinotFS {
 
       ListObjectsV2Request listObjectsV2Request =
           
ListObjectsV2Request.builder().bucket(uri.getHost()).prefix(prefix).maxKeys(2).build();
-      ListObjectsV2Response listObjectsV2Response = 
_s3Client.listObjectsV2(listObjectsV2Request);
+      ListObjectsV2Response listObjectsV2Response = 
retryWithS3CredentialRefresh(() ->
+          _s3Client.listObjectsV2(listObjectsV2Request));
       return listObjectsV2Response.hasContents();
     } catch (NoSuchKeyException e) {
       LOGGER.error("Could not get directory entry for {}", uri);
@@ -817,13 +907,13 @@ public class S3PinotFS extends BasePinotFS {
       String path = sanitizePath(uri.getPath());
       CopyObjectRequest request = generateCopyObjectRequest(encodedUrl, uri, 
path,
           ImmutableMap.of("lastModified", 
String.valueOf(System.currentTimeMillis())));
-      _s3Client.copyObject(request);
+      retryWithS3CredentialRefresh(() -> _s3Client.copyObject(request));
       long newUpdateTime = 
getS3ObjectMetadata(uri).lastModified().toEpochMilli();
       return newUpdateTime > s3ObjectMetadata.lastModified().toEpochMilli();
     } catch (NoSuchKeyException e) {
       String path = sanitizePath(uri.getPath());
       PutObjectRequest putObjectRequest = generatePutObjectRequest(uri, path);
-      _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new 
byte[0]));
+      retryWithS3CredentialRefresh(() -> _s3Client.putObject(putObjectRequest, 
RequestBody.fromBytes(new byte[0])));
       return true;
     } catch (S3Exception e) {
       throw new IOException(e);
@@ -880,7 +970,7 @@ public class S3PinotFS extends BasePinotFS {
       String path = sanitizePath(uri.getPath());
       GetObjectRequest getObjectRequest = 
GetObjectRequest.builder().bucket(uri.getHost()).key(path).build();
 
-      return _s3Client.getObject(getObjectRequest);
+      return retryWithS3CredentialRefresh(() -> 
_s3Client.getObject(getObjectRequest));
     } catch (S3Exception e) {
       throw e;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to