nastra commented on code in PR #12197: URL: https://github.com/apache/iceberg/pull/12197#discussion_r1993034051
########## aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java: ########## @@ -64,7 +65,9 @@ public AwsCredentials resolveCredentials() { @Override public void close() { - IoUtils.closeQuietly(client, null); + IoUtils.closeQuietlyV2(authSession, null); + IoUtils.closeQuietlyV2(authManager, null); + IoUtils.closeQuietlyV2(client, null); credentialCache.close(); Review Comment: nit: maybe let's move this one also to `IoUtils.closeQuietlyV2(credentialCache, null);` ########## aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3V4RestSignerClient.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import static org.apache.iceberg.aws.s3.signer.S3V4RestSignerClient.S3_SIGNER_URI; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.type; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.auth.AuthProperties; +import org.apache.iceberg.rest.auth.AuthSession; +import org.apache.iceberg.rest.auth.OAuth2Properties; +import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +class TestS3V4RestSignerClient { + + @BeforeAll + static void beforeAll() { + S3V4RestSignerClient.httpClient = Mockito.mock(RESTClient.class); + when(S3V4RestSignerClient.httpClient.withAuthSession(Mockito.any())) + .thenReturn(S3V4RestSignerClient.httpClient); + when(S3V4RestSignerClient.httpClient.postForm( + Mockito.anyString(), + Mockito.eq( + Map.of( + "grant_type", + "client_credentials", + "client_id", + "user", + "client_secret", + "12345", + "scope", + "sign")), + Mockito.eq(OAuthTokenResponse.class), + Mockito.anyMap(), + Mockito.any())) + .thenReturn( + OAuthTokenResponse.builder().withToken("token").withTokenType("Bearer").build()); + } + + @AfterAll + static void afterAll() { + S3V4RestSignerClient.httpClient = null; + } + + @ParameterizedTest + @MethodSource("validOAuth2Properties") + void authSessionOAuth2(Map<String, String> properties, String expectedToken) throws Exception { Review Comment: thanks for adding these tests ########## core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java: ########## @@ -81,25 +85,25 @@ public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, Str this.startTimeMillis = System.currentTimeMillis(); this.authResponse = OAuth2Util.fetchToken( - initClient, - headers, + initClient.withAuthSession(session), + Map.of(), config.credential(), config.scope(), config.oauth2ServerUri(), config.optionalOAuthParams()); return OAuth2Util.AuthSession.fromTokenResponse( - initClient, null, authResponse, startTimeMillis, session); + null, null, authResponse, startTimeMillis, session); Review Comment: can you elaborate why we're not passing the client here and a few lines below anymore? I understand that in both case we're not doing a refresh but I would probably still just pass a non-null client ########## core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java: ########## @@ -81,25 +85,25 @@ public OAuth2Util.AuthSession initSession(RESTClient initClient, Map<String, Str this.startTimeMillis = System.currentTimeMillis(); this.authResponse = OAuth2Util.fetchToken( - initClient, - headers, + initClient.withAuthSession(session), + Map.of(), config.credential(), config.scope(), config.oauth2ServerUri(), config.optionalOAuthParams()); return OAuth2Util.AuthSession.fromTokenResponse( - initClient, null, authResponse, startTimeMillis, session); + null, null, authResponse, startTimeMillis, session); } else if (config.token() != null) { - return OAuth2Util.AuthSession.fromAccessToken( - initClient, null, config.token(), null, session); + return OAuth2Util.AuthSession.fromAccessToken(null, null, config.token(), null, session); } return session; } @Override public OAuth2Util.AuthSession catalogSession( RESTClient sharedClient, Map<String, String> properties) { - this.client = sharedClient; + // This client will be used for token refreshes; it should not have an auth session. Review Comment: might be worth renaming the client to `refreshClient` to make this clearer ########## aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3V4RestSignerClient.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3.signer; + +import static org.apache.iceberg.aws.s3.signer.S3V4RestSignerClient.S3_SIGNER_URI; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.type; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.auth.AuthProperties; +import org.apache.iceberg.rest.auth.AuthSession; +import org.apache.iceberg.rest.auth.OAuth2Properties; +import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +class TestS3V4RestSignerClient { + + @BeforeAll + static void beforeAll() { + S3V4RestSignerClient.httpClient = Mockito.mock(RESTClient.class); + when(S3V4RestSignerClient.httpClient.withAuthSession(Mockito.any())) + .thenReturn(S3V4RestSignerClient.httpClient); + when(S3V4RestSignerClient.httpClient.postForm( + Mockito.anyString(), + Mockito.eq( + Map.of( + "grant_type", + "client_credentials", + "client_id", + "user", + "client_secret", + "12345", + "scope", + "sign")), + Mockito.eq(OAuthTokenResponse.class), + Mockito.anyMap(), + Mockito.any())) + .thenReturn( + OAuthTokenResponse.builder().withToken("token").withTokenType("Bearer").build()); + } + + @AfterAll + static void afterAll() { + S3V4RestSignerClient.httpClient = null; + } + + @ParameterizedTest + @MethodSource("validOAuth2Properties") + void authSessionOAuth2(Map<String, String> properties, String expectedToken) throws Exception { + Review Comment: nit: extra newline here ########## aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java: ########## @@ -200,86 +151,40 @@ private RESTClient httpClient() { return httpClient; } - private AuthSession authSession() { - String token = token().get(); - if (null != token) { - return authSessionCache() - .get( - token, - id -> { - // this client will be reused for token refreshes; it must contain an empty auth - // session in order to avoid interfering with refreshed tokens - RESTClient refreshClient = - httpClient().withAuthSession(org.apache.iceberg.rest.auth.AuthSession.EMPTY); - return AuthSession.fromAccessToken( - refreshClient, - tokenRefreshExecutor(), - token, - expiresAtMillis(properties()), - new AuthSession( - ImmutableMap.of(), - AuthConfig.builder() - .token(token) - .credential(credential()) - .scope(SCOPE) - .oauth2ServerUri(oauth2ServerUri()) - .optionalOAuthParams(optionalOAuthParams()) - .build())); - }); - } - - if (credentialProvided()) { - return authSessionCache() - .get( - credential(), - id -> { - AuthSession session = - new AuthSession( - ImmutableMap.of(), - AuthConfig.builder() - .credential(credential()) - .scope(SCOPE) - .oauth2ServerUri(oauth2ServerUri()) - .optionalOAuthParams(optionalOAuthParams()) - .build()); - long startTimeMillis = System.currentTimeMillis(); - // this client will be reused for token refreshes; it must contain an empty auth - // session in order to avoid interfering with refreshed tokens - RESTClient refreshClient = - httpClient().withAuthSession(org.apache.iceberg.rest.auth.AuthSession.EMPTY); - OAuthTokenResponse authResponse = - OAuth2Util.fetchToken( - refreshClient, - session.headers(), - credential(), - SCOPE, - oauth2ServerUri(), - optionalOAuthParams()); - return AuthSession.fromTokenResponse( - refreshClient, tokenRefreshExecutor(), authResponse, startTimeMillis, session); - }); + @VisibleForTesting + AuthSession authSession() { + if (null == authSession) { + synchronized (S3V4RestSignerClient.class) { + if (null == authSession) { + authManager = AuthManagers.loadAuthManager("s3-signer", properties()); + ImmutableMap.Builder<String, String> properties = + ImmutableMap.<String, String>builder() + .putAll(properties()) + .putAll(optionalOAuthParams()) + .put(OAuth2Properties.OAUTH2_SERVER_URI, oauth2ServerUri()) + .put(OAuth2Properties.TOKEN_REFRESH_ENABLED, String.valueOf(keepTokenRefreshed())) + .put(OAuth2Properties.SCOPE, SCOPE); + String token = token().get(); + if (null != token) { + properties.put(OAuth2Properties.TOKEN, token); + } + + if (credentialProvided()) { + properties.put(OAuth2Properties.CREDENTIAL, credential()); + } + + authSession = authManager.catalogSession(httpClient(), properties.buildKeepingLast()); Review Comment: sorry for the late reply here. I do agree with @danielcweeks's obersation here. Fundamentally this is a table session because the token is scoped down to a table and using `catalogSession` just makes things conceptually confusing. Introducing & using `tableSession` would make much sense here and IMO should be done as part of this PR ########## gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java: ########## @@ -100,9 +95,33 @@ public static OAuth2RefreshCredentialsHandler create(Map<String, String> propert } private RESTClient httpClient() { - return HTTPClient.builder(properties) - .uri(properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT)) - .withAuthSession(authSession) - .build(); + if (null == client) { + synchronized (this) { + if (null == client) { + authManager = AuthManagers.loadAuthManager("gcp-credentials-refresh", properties); + HTTPClient httpClient = + HTTPClient.builder(properties) + .uri(properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT)) + .build(); + authSession = authManager.catalogSession(httpClient, properties); + client = httpClient.withAuthSession(authSession); + } + } + } + + return client; + } + + @Override + public void close() { + CloseableGroup closeableGroup = new CloseableGroup(); + closeableGroup.addCloseable(authSession); + closeableGroup.addCloseable(authManager); + closeableGroup.addCloseable(client); Review Comment: I think we should set `closeableGroup.setSuppressCloseFailure(true)` here ########## aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java: ########## @@ -76,14 +79,10 @@ private RESTClient httpClient() { if (null == client) { synchronized (this) { if (null == client) { - DefaultAuthSession authSession = - DefaultAuthSession.of( - HTTPHeaders.of(OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)))); - client = - HTTPClient.builder(properties) - .uri(properties.get(URI)) - .withAuthSession(authSession) - .build(); + authManager = AuthManagers.loadAuthManager("aws-credentials-refresh", properties); Review Comment: ```suggestion authManager = AuthManagers.loadAuthManager("s3-credentials-refresh", properties); ``` nit: I'd call this rather `s3-...` instead of `aws-...` to be more specific here ########## gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java: ########## @@ -100,9 +95,33 @@ public static OAuth2RefreshCredentialsHandler create(Map<String, String> propert } private RESTClient httpClient() { - return HTTPClient.builder(properties) - .uri(properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT)) - .withAuthSession(authSession) - .build(); + if (null == client) { + synchronized (this) { + if (null == client) { + authManager = AuthManagers.loadAuthManager("gcp-credentials-refresh", properties); Review Comment: ```suggestion authManager = AuthManagers.loadAuthManager("gcs-credentials-refresh", properties); ``` ########## aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java: ########## @@ -96,45 +98,10 @@ public static void beforeClass() throws Exception { if (null == httpServer) { httpServer = initHttpServer(); } - - validatingSigner = - new ValidatingSigner( - ImmutableS3V4RestSignerClient.builder() - .properties( - ImmutableMap.of( - S3V4RestSignerClient.S3_SIGNER_URI, - httpServer.getURI().toString(), - OAuth2Properties.CREDENTIAL, - "catalog:12345")) - .build(), - new CustomAwsS3V4Signer()); } @AfterAll public static void afterClass() throws Exception { - assertThat(validatingSigner.icebergSigner.tokenRefreshExecutor()) - .isInstanceOf(ScheduledThreadPoolExecutor.class); - - ScheduledThreadPoolExecutor executor = - ((ScheduledThreadPoolExecutor) validatingSigner.icebergSigner.tokenRefreshExecutor()); - // token expiration is set to 10000s by the S3SignerServlet so there should be exactly one token - // scheduled for refresh. Such a high token expiration value is explicitly selected to be much - // larger than TestS3RestSigner would need to execute all tests. - // The reason why this check is done here with a high token expiration is to make sure that - // there aren't other token refreshes being scheduled after every sign request and after - // TestS3RestSigner completes all tests, there should be only this single token in the queue - // that is scheduled for refresh Review Comment: we should make sure to test this somewhere else, since this was making sure that there are no other token refreshes being scheduled after every sign request -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org