Copilot commented on code in PR #2989:
URL: https://github.com/apache/fluss/pull/2989#discussion_r3031065899
##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java:
##########
@@ -68,17 +81,27 @@ public S3DelegationTokenProvider(String scheme,
Configuration conf) {
}
public ObtainedSecurityToken obtainSecurityToken() {
- LOG.info("Obtaining session credentials token with access key: {}",
accessKey);
-
- AWSSecurityTokenService stsClient =
- AWSSecurityTokenServiceClientBuilder.standard()
- .withRegion(region)
- .withCredentials(
- new AWSStaticCredentialsProvider(
- new BasicAWSCredentials(accessKey,
secretKey)))
- .build();
- GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken();
- Credentials credentials = sessionTokenResult.getCredentials();
+ AWSSecurityTokenService stsClient = buildStsClient();
+ Credentials credentials;
+
Review Comment:
`AWSSecurityTokenService` client is created per `obtainSecurityToken()` call
but never shut down. In AWS SDK v1 this can keep HTTP resources/threads alive;
consider wrapping usage in try/finally and calling `stsClient.shutdown()` (or
`close()` if available) after the STS call completes.
##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java:
##########
@@ -89,6 +112,23 @@ public ObtainedSecurityToken obtainSecurityToken() {
scheme, toJson(credentials),
credentials.getExpiration().getTime(), additionInfos);
}
+ private AWSSecurityTokenService buildStsClient() {
+ AWSSecurityTokenServiceClientBuilder builder =
+ AWSSecurityTokenServiceClientBuilder.standard()
+ .withCredentials(
+ new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(accessKey,
secretKey)));
Review Comment:
`buildStsClient()` constructs `BasicAWSCredentials(accessKey, secretKey)`
without validating `accessKey/secretKey` are set. If either is missing, this
will fail with a low-level exception; consider adding explicit null/blank
checks with a clearer error explaining which S3 config keys must be provided to
obtain delegation tokens.
##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java:
##########
@@ -68,17 +81,27 @@ public S3DelegationTokenProvider(String scheme,
Configuration conf) {
}
public ObtainedSecurityToken obtainSecurityToken() {
- LOG.info("Obtaining session credentials token with access key: {}",
accessKey);
-
- AWSSecurityTokenService stsClient =
- AWSSecurityTokenServiceClientBuilder.standard()
- .withRegion(region)
- .withCredentials(
- new AWSStaticCredentialsProvider(
- new BasicAWSCredentials(accessKey,
secretKey)))
- .build();
- GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken();
- Credentials credentials = sessionTokenResult.getCredentials();
+ AWSSecurityTokenService stsClient = buildStsClient();
+ Credentials credentials;
+
+ if (roleArn != null) {
+ LOG.info(
+ "Obtaining session credentials via AssumeRole with access
key: {}, role: {}",
+ accessKey,
+ roleArn);
Review Comment:
The `roleArn != null` check treats an empty/blank config value as enabled
and will attempt `AssumeRole` with an invalid ARN, producing a hard-to-diagnose
STS error. Consider normalizing config values (e.g., trim) and validating
`roleArn` is non-blank before entering the AssumeRole branch; otherwise fall
back to `GetSessionToken` or fail fast with a clear message.
##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java:
##########
@@ -68,17 +81,27 @@ public S3DelegationTokenProvider(String scheme,
Configuration conf) {
}
public ObtainedSecurityToken obtainSecurityToken() {
- LOG.info("Obtaining session credentials token with access key: {}",
accessKey);
-
- AWSSecurityTokenService stsClient =
- AWSSecurityTokenServiceClientBuilder.standard()
- .withRegion(region)
- .withCredentials(
- new AWSStaticCredentialsProvider(
- new BasicAWSCredentials(accessKey,
secretKey)))
- .build();
- GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken();
- Credentials credentials = sessionTokenResult.getCredentials();
+ AWSSecurityTokenService stsClient = buildStsClient();
+ Credentials credentials;
+
+ if (roleArn != null) {
+ LOG.info(
+ "Obtaining session credentials via AssumeRole with access
key: {}, role: {}",
+ accessKey,
+ roleArn);
+ AssumeRoleRequest request =
+ new AssumeRoleRequest()
+ .withRoleArn(roleArn)
+ .withRoleSessionName("fluss-" + UUID.randomUUID());
+ AssumeRoleResult result = stsClient.assumeRole(request);
+ credentials = result.getCredentials();
+ } else {
Review Comment:
New AssumeRole path (`roleArn` branch) is not covered by existing S3 token
IT cases (they only exercise `GetSessionToken`). Consider adding a focused test
that verifies the AssumeRole request is used when `fs.s3a.assumed.role.arn` is
configured (e.g., by extracting an STS client factory for mocking, or by using
a local STS-compatible endpoint in tests).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]