snazy commented on code in PR #2153: URL: https://github.com/apache/polaris/pull/2153#discussion_r2224986251
########## polaris-core/src/main/java/org/apache/polaris/core/storage/oss/OssStsClientProvider.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.oss; + +import com.aliyuncs.DefaultAcsClient; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.auth.sts.AssumeRoleRequest; +import com.aliyuncs.auth.sts.AssumeRoleResponse; +import com.aliyuncs.exceptions.ClientException; +import com.aliyuncs.http.MethodType; +import com.aliyuncs.profile.DefaultProfile; +import com.aliyuncs.profile.IClientProfile; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.net.URI; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provider for OSS STS clients with caching support using direct STS AssumeRole API + */ +public class OssStsClientProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(OssStsClientProvider.class); + + private final ConcurrentMap<StsDestination, IAcsClient> clientCache = new ConcurrentHashMap<>(); + private final int maxCacheSize; + + public OssStsClientProvider() { + this(50); // Default cache size + } + + public OssStsClientProvider(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } + + /** + * Perform STS AssumeRole operation to get temporary credentials + */ + public AssumeRoleResponse assumeRole( + String roleArn, String roleSessionName, String externalId, String region, + URI stsEndpoint, String policy, Long durationSeconds) throws ClientException { + + // Get environment variables for authentication + String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); + String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); + + if (accessKeyId == null || accessKeySecret == null) { + throw new RuntimeException( + "OSS credentials not available. Please set ALIBABA_CLOUD_ACCESS_KEY_ID " + + "and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables."); + } + + LOGGER.debug("Using OSS credentials from environment variables"); + + // Set default region if not provided + String effectiveRegion = region != null ? region : "cn-hangzhou"; + + // Set STS endpoint + String endpoint = stsEndpoint != null ? stsEndpoint.getHost() : ("sts." + effectiveRegion + ".aliyuncs.com"); + + try { + // Add endpoint for STS service + DefaultProfile.addEndpoint("", "Sts", endpoint); + + // Construct default profile + IClientProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret); + + // Construct client + DefaultAcsClient client = new DefaultAcsClient(profile); + + // Create AssumeRole request + final AssumeRoleRequest request = new AssumeRoleRequest(); + request.setSysMethod(MethodType.POST); + request.setRoleArn(roleArn); + request.setRoleSessionName(roleSessionName); + + if (policy != null) { + request.setPolicy(policy); + } + + if (durationSeconds != null) { + request.setDurationSeconds(durationSeconds); + } + + // Execute AssumeRole request + final AssumeRoleResponse response = client.getAcsResponse(request); + + LOGGER.debug("Successfully assumed role: {}", roleArn); + return response; + + } catch (ClientException e) { + LOGGER.error("Failed to assume OSS role: roleArn={}, error={}", roleArn, e.getErrMsg()); + throw e; + } + } + + /** + * Get or create STS client for the given destination with role configuration + */ + public IAcsClient stsClient(@Nonnull StsDestination destination, + @Nonnull String roleArn, + @Nonnull String roleSessionName, + @Nullable String externalId) { + StsClientKey key = new StsClientKey(destination, roleArn, roleSessionName, externalId); + return clientCache.computeIfAbsent(destination, dest -> createClient(key)); + } + + private IAcsClient createClient(StsClientKey key) { + // Evict entries if cache is full + if (clientCache.size() >= maxCacheSize) { + evictOldestEntry(); + } + + LOGGER.debug("Creating new OSS STS client for destination: {}, roleArn: {}", + key.destination, key.roleArn); + + // Get environment variables for authentication + String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); + String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); + + if (accessKeyId == null || accessKeySecret == null) { + throw new RuntimeException( + "OSS credentials not available. Please set ALIBABA_CLOUD_ACCESS_KEY_ID " + + "and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables."); + } + + String effectiveRegion = key.destination.region(); + + // Set STS endpoint + String endpoint = key.destination.endpoint() != null ? + key.destination.endpoint().getHost() : ("sts." + effectiveRegion + ".aliyuncs.com"); + + try { + // Add endpoint for STS service + DefaultProfile.addEndpoint("", "Sts", endpoint); + + // Construct default profile + IClientProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret); + + // Construct and return client + return new DefaultAcsClient(profile); + + } catch (Exception e) { + LOGGER.error("Failed to create STS client: {}", e.getMessage(), e); + throw new RuntimeException("Failed to create STS client", e); + } + } + + private void evictOldestEntry() { + if (!clientCache.isEmpty()) { + // Simple eviction strategy - remove first entry + StsDestination firstKey = clientCache.keySet().iterator().next(); + clientCache.remove(firstKey); + LOGGER.debug("Evicted STS client from cache for: {}", firstKey); + } + } + + /** + * Clear all cached clients + */ + public void clearCache() { + clientCache.clear(); + LOGGER.debug("Cleared OSS STS client cache"); + } + + /** + * Get current cache size + */ + public int getCacheSize() { + return clientCache.size(); + } + + /** + * Represents a destination for STS client creation + */ + public static class StsDestination { Review Comment: Can be drastically simplified by using a Java record or a `@PolarisImmutable`. ########## polaris-core/src/main/java/org/apache/polaris/core/storage/oss/OssLocation.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.oss; + +import jakarta.annotation.Nonnull; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.polaris.core.storage.StorageLocation; + +public class OssLocation extends StorageLocation { Review Comment: I think it would be better to only leverage `StorageLocation` instead of extending it and override (and change?) its functionality. The additional functionality needs test coverage. ########## polaris-core/build.gradle.kts: ########## @@ -101,9 +101,15 @@ dependencies { implementation(platform(libs.google.cloud.storage.bom)) implementation("com.google.cloud:google-cloud-storage") + implementation("org.apache.iceberg:iceberg-aliyun") + implementation("com.aliyun.oss:aliyun-sdk-oss:3.17.4") Review Comment: I also figured out that the Github repo mentions 3.17.2 as the latest release from Oct 2023, which is concerning as it makes it very difficult to figure out what code is included in the later releases. ########## polaris-core/src/main/java/org/apache/polaris/core/storage/oss/OssStorageConfigurationInfo.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.oss; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.MoreObjects; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.net.URI; +import java.util.List; +import java.util.regex.Pattern; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; + +/** OSS Polaris Storage Configuration information */ +public class OssStorageConfigurationInfo extends PolarisStorageConfigurationInfo { Review Comment: This is pretty much the same as what we already have for S3, so I think that the full code duplication isn't necessary. The functionality also deserves test coverage. ########## polaris-core/src/main/java/org/apache/polaris/core/storage/oss/OssStsClientProvider.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.oss; + +import com.aliyuncs.DefaultAcsClient; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.auth.sts.AssumeRoleRequest; +import com.aliyuncs.auth.sts.AssumeRoleResponse; +import com.aliyuncs.exceptions.ClientException; +import com.aliyuncs.http.MethodType; +import com.aliyuncs.profile.DefaultProfile; +import com.aliyuncs.profile.IClientProfile; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.net.URI; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provider for OSS STS clients with caching support using direct STS AssumeRole API + */ +public class OssStsClientProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(OssStsClientProvider.class); + + private final ConcurrentMap<StsDestination, IAcsClient> clientCache = new ConcurrentHashMap<>(); + private final int maxCacheSize; + + public OssStsClientProvider() { + this(50); // Default cache size + } + + public OssStsClientProvider(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } + + /** + * Perform STS AssumeRole operation to get temporary credentials + */ + public AssumeRoleResponse assumeRole( + String roleArn, String roleSessionName, String externalId, String region, + URI stsEndpoint, String policy, Long durationSeconds) throws ClientException { + + // Get environment variables for authentication + String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); + String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); + + if (accessKeyId == null || accessKeySecret == null) { + throw new RuntimeException( + "OSS credentials not available. Please set ALIBABA_CLOUD_ACCESS_KEY_ID " + + "and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables."); + } + + LOGGER.debug("Using OSS credentials from environment variables"); + + // Set default region if not provided + String effectiveRegion = region != null ? region : "cn-hangzhou"; + + // Set STS endpoint + String endpoint = stsEndpoint != null ? stsEndpoint.getHost() : ("sts." + effectiveRegion + ".aliyuncs.com"); + + try { + // Add endpoint for STS service + DefaultProfile.addEndpoint("", "Sts", endpoint); + + // Construct default profile + IClientProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret); + + // Construct client + DefaultAcsClient client = new DefaultAcsClient(profile); + + // Create AssumeRole request + final AssumeRoleRequest request = new AssumeRoleRequest(); + request.setSysMethod(MethodType.POST); + request.setRoleArn(roleArn); + request.setRoleSessionName(roleSessionName); + + if (policy != null) { + request.setPolicy(policy); + } + + if (durationSeconds != null) { + request.setDurationSeconds(durationSeconds); + } + + // Execute AssumeRole request + final AssumeRoleResponse response = client.getAcsResponse(request); + + LOGGER.debug("Successfully assumed role: {}", roleArn); + return response; + + } catch (ClientException e) { + LOGGER.error("Failed to assume OSS role: roleArn={}, error={}", roleArn, e.getErrMsg()); + throw e; + } + } + + /** + * Get or create STS client for the given destination with role configuration + */ + public IAcsClient stsClient(@Nonnull StsDestination destination, + @Nonnull String roleArn, + @Nonnull String roleSessionName, + @Nullable String externalId) { + StsClientKey key = new StsClientKey(destination, roleArn, roleSessionName, externalId); + return clientCache.computeIfAbsent(destination, dest -> createClient(key)); + } + + private IAcsClient createClient(StsClientKey key) { + // Evict entries if cache is full + if (clientCache.size() >= maxCacheSize) { + evictOldestEntry(); + } + + LOGGER.debug("Creating new OSS STS client for destination: {}, roleArn: {}", + key.destination, key.roleArn); + + // Get environment variables for authentication + String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); + String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); + + if (accessKeyId == null || accessKeySecret == null) { + throw new RuntimeException( + "OSS credentials not available. Please set ALIBABA_CLOUD_ACCESS_KEY_ID " + + "and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables."); + } + + String effectiveRegion = key.destination.region(); + + // Set STS endpoint + String endpoint = key.destination.endpoint() != null ? + key.destination.endpoint().getHost() : ("sts." + effectiveRegion + ".aliyuncs.com"); + + try { + // Add endpoint for STS service + DefaultProfile.addEndpoint("", "Sts", endpoint); + + // Construct default profile + IClientProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret); + + // Construct and return client + return new DefaultAcsClient(profile); + + } catch (Exception e) { + LOGGER.error("Failed to create STS client: {}", e.getMessage(), e); + throw new RuntimeException("Failed to create STS client", e); + } + } + + private void evictOldestEntry() { + if (!clientCache.isEmpty()) { + // Simple eviction strategy - remove first entry + StsDestination firstKey = clientCache.keySet().iterator().next(); + clientCache.remove(firstKey); + LOGGER.debug("Evicted STS client from cache for: {}", firstKey); + } + } + + /** + * Clear all cached clients + */ + public void clearCache() { + clientCache.clear(); + LOGGER.debug("Cleared OSS STS client cache"); + } + + /** + * Get current cache size + */ + public int getCacheSize() { + return clientCache.size(); + } + + /** + * Represents a destination for STS client creation + */ + public static class StsDestination { + private final String region; + private final URI endpoint; + + private StsDestination(String region, URI endpoint) { + this.region = region; + this.endpoint = endpoint; + } + + public static StsDestination of(@Nullable URI endpoint, @Nullable String region) { + String effectiveRegion = region != null ? region : "cn-hangzhou"; + return new StsDestination(effectiveRegion, endpoint); + } + + public String region() { + return region; + } + + public URI endpoint() { + return endpoint; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StsDestination that = (StsDestination) o; + return Objects.equals(region, that.region) && Objects.equals(endpoint, that.endpoint); + } + + @Override + public int hashCode() { + return Objects.hash(region, endpoint); + } + + @Override + public String toString() { + return "StsDestination{region='" + region + "', endpoint=" + endpoint + '}'; + } + } + + /** + * Key for caching STS clients with role information + */ + private static class StsClientKey { Review Comment: Can be drastically simplified by using a Java record or a `@PolarisImmutable`. ########## polaris-core/src/main/java/org/apache/polaris/core/storage/oss/OssStsClientProvider.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.oss; + +import com.aliyuncs.DefaultAcsClient; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.auth.sts.AssumeRoleRequest; +import com.aliyuncs.auth.sts.AssumeRoleResponse; +import com.aliyuncs.exceptions.ClientException; +import com.aliyuncs.http.MethodType; +import com.aliyuncs.profile.DefaultProfile; +import com.aliyuncs.profile.IClientProfile; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.net.URI; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provider for OSS STS clients with caching support using direct STS AssumeRole API + */ +public class OssStsClientProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(OssStsClientProvider.class); + + private final ConcurrentMap<StsDestination, IAcsClient> clientCache = new ConcurrentHashMap<>(); + private final int maxCacheSize; + + public OssStsClientProvider() { + this(50); // Default cache size + } + + public OssStsClientProvider(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } + + /** + * Perform STS AssumeRole operation to get temporary credentials + */ + public AssumeRoleResponse assumeRole( + String roleArn, String roleSessionName, String externalId, String region, + URI stsEndpoint, String policy, Long durationSeconds) throws ClientException { + + // Get environment variables for authentication + String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); + String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); + + if (accessKeyId == null || accessKeySecret == null) { + throw new RuntimeException( + "OSS credentials not available. Please set ALIBABA_CLOUD_ACCESS_KEY_ID " + Review Comment: I'm a bit confused by this message. It says that "OSS" credentials are not available and asks to set "Alibaba" credentials. It if's all "Alibaba", I think that term should be used everywhere. ########## polaris-core/src/main/java/org/apache/polaris/core/storage/oss/OssStsClientProvider.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.oss; + +import com.aliyuncs.DefaultAcsClient; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.auth.sts.AssumeRoleRequest; +import com.aliyuncs.auth.sts.AssumeRoleResponse; +import com.aliyuncs.exceptions.ClientException; +import com.aliyuncs.http.MethodType; +import com.aliyuncs.profile.DefaultProfile; +import com.aliyuncs.profile.IClientProfile; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.net.URI; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provider for OSS STS clients with caching support using direct STS AssumeRole API + */ +public class OssStsClientProvider { Review Comment: I think all client providers should _share_ common functionality. ########## polaris-core/build.gradle.kts: ########## @@ -101,9 +101,15 @@ dependencies { implementation(platform(libs.google.cloud.storage.bom)) implementation("com.google.cloud:google-cloud-storage") + implementation("org.apache.iceberg:iceberg-aliyun") + implementation("com.aliyun.oss:aliyun-sdk-oss:3.17.4") Review Comment: Same is true for the other dependencies, at least some repos do not have any Git tag (e.g. https://github.com/aliyun/aliyun-openapi-java-sdk for `aliyun-java-sdk-core`). ########## polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java: ########## @@ -128,50 +130,59 @@ public Catalog asCatalog() { .build(); } - private StorageConfigInfo getStorageInfo(Map<String, String> internalProperties) { - if (internalProperties.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName())) { - PolarisStorageConfigurationInfo configInfo = getStorageConfigurationInfo(); - if (configInfo instanceof AwsStorageConfigurationInfo) { - AwsStorageConfigurationInfo awsConfig = (AwsStorageConfigurationInfo) configInfo; - return AwsStorageConfigInfo.builder() - .setRoleArn(awsConfig.getRoleARN()) - .setExternalId(awsConfig.getExternalId()) - .setUserArn(awsConfig.getUserARN()) - .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) - .setAllowedLocations(awsConfig.getAllowedLocations()) - .setRegion(awsConfig.getRegion()) - .setEndpoint(awsConfig.getEndpoint()) - .setStsEndpoint(awsConfig.getStsEndpoint()) - .setPathStyleAccess(awsConfig.getPathStyleAccess()) - .build(); - } - if (configInfo instanceof AzureStorageConfigurationInfo) { - AzureStorageConfigurationInfo azureConfig = (AzureStorageConfigurationInfo) configInfo; - return AzureStorageConfigInfo.builder() - .setTenantId(azureConfig.getTenantId()) - .setMultiTenantAppName(azureConfig.getMultiTenantAppName()) - .setConsentUrl(azureConfig.getConsentUrl()) - .setStorageType(AZURE) - .setAllowedLocations(azureConfig.getAllowedLocations()) - .build(); - } - if (configInfo instanceof GcpStorageConfigurationInfo) { - GcpStorageConfigurationInfo gcpConfigModel = (GcpStorageConfigurationInfo) configInfo; - return GcpStorageConfigInfo.builder() - .setGcsServiceAccount(gcpConfigModel.getGcpServiceAccount()) - .setStorageType(StorageConfigInfo.StorageTypeEnum.GCS) - .setAllowedLocations(gcpConfigModel.getAllowedLocations()) - .build(); - } - if (configInfo instanceof FileStorageConfigurationInfo) { - FileStorageConfigurationInfo fileConfigModel = (FileStorageConfigurationInfo) configInfo; - return new FileStorageConfigInfo( - StorageConfigInfo.StorageTypeEnum.FILE, fileConfigModel.getAllowedLocations()); - } - return null; + private StorageConfigInfo getStorageInfo(Map<String, String> internalProperties) { + if (internalProperties.containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName())) { + PolarisStorageConfigurationInfo configInfo = getStorageConfigurationInfo(); + if (configInfo instanceof AwsStorageConfigurationInfo) { + AwsStorageConfigurationInfo awsConfig = (AwsStorageConfigurationInfo) configInfo; + return AwsStorageConfigInfo.builder() + .setRoleArn(awsConfig.getRoleARN()) + .setExternalId(awsConfig.getExternalId()) + .setUserArn(awsConfig.getUserARN()) + .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) + .setAllowedLocations(awsConfig.getAllowedLocations()) + .setRegion(awsConfig.getRegion()) + .build(); Review Comment: This changes (and breaks) the current implementation. ########## polaris-core/src/main/java/org/apache/polaris/core/storage/oss/OssStsClientProvider.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.oss; + +import com.aliyuncs.DefaultAcsClient; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.auth.sts.AssumeRoleRequest; +import com.aliyuncs.auth.sts.AssumeRoleResponse; +import com.aliyuncs.exceptions.ClientException; +import com.aliyuncs.http.MethodType; +import com.aliyuncs.profile.DefaultProfile; +import com.aliyuncs.profile.IClientProfile; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.net.URI; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provider for OSS STS clients with caching support using direct STS AssumeRole API + */ +public class OssStsClientProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(OssStsClientProvider.class); + + private final ConcurrentMap<StsDestination, IAcsClient> clientCache = new ConcurrentHashMap<>(); Review Comment: Why not a Caffeine cache? ########## polaris-core/src/main/java/org/apache/polaris/core/storage/oss/OssStsClientProvider.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.oss; + +import com.aliyuncs.DefaultAcsClient; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.auth.sts.AssumeRoleRequest; +import com.aliyuncs.auth.sts.AssumeRoleResponse; +import com.aliyuncs.exceptions.ClientException; +import com.aliyuncs.http.MethodType; +import com.aliyuncs.profile.DefaultProfile; +import com.aliyuncs.profile.IClientProfile; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.net.URI; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provider for OSS STS clients with caching support using direct STS AssumeRole API + */ +public class OssStsClientProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(OssStsClientProvider.class); + + private final ConcurrentMap<StsDestination, IAcsClient> clientCache = new ConcurrentHashMap<>(); + private final int maxCacheSize; + + public OssStsClientProvider() { + this(50); // Default cache size + } + + public OssStsClientProvider(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } + + /** + * Perform STS AssumeRole operation to get temporary credentials + */ + public AssumeRoleResponse assumeRole( + String roleArn, String roleSessionName, String externalId, String region, + URI stsEndpoint, String policy, Long durationSeconds) throws ClientException { + + // Get environment variables for authentication + String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); + String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); + + if (accessKeyId == null || accessKeySecret == null) { + throw new RuntimeException( + "OSS credentials not available. Please set ALIBABA_CLOUD_ACCESS_KEY_ID " + + "and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables."); + } + + LOGGER.debug("Using OSS credentials from environment variables"); + + // Set default region if not provided + String effectiveRegion = region != null ? region : "cn-hangzhou"; + + // Set STS endpoint + String endpoint = stsEndpoint != null ? stsEndpoint.getHost() : ("sts." + effectiveRegion + ".aliyuncs.com"); + + try { + // Add endpoint for STS service + DefaultProfile.addEndpoint("", "Sts", endpoint); + + // Construct default profile + IClientProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret); + + // Construct client + DefaultAcsClient client = new DefaultAcsClient(profile); Review Comment: Why does this create a new client, although this class _caches_ clients? ########## service/common/src/main/java/org/apache/polaris/service/catalog/validation/StorageTypeFileIO.java: ########## @@ -32,6 +32,8 @@ enum StorageTypeFileIO { FILE("org.apache.iceberg.hadoop.HadoopFileIO", false), + OSS("org.apache.iceberg.aliyun.oss.OSSFileIO", true), Review Comment: Also think that "OSS" is quite confusing. It's not a commonly known term for an object storage. ########## polaris-core/build.gradle.kts: ########## @@ -101,9 +101,15 @@ dependencies { implementation(platform(libs.google.cloud.storage.bom)) implementation("com.google.cloud:google-cloud-storage") + implementation("org.apache.iceberg:iceberg-aliyun") + implementation("com.aliyun.oss:aliyun-sdk-oss:3.17.4") Review Comment: NB: I recommend that the published `pom.xml` files have the `<scm><tag>` element populated with the corresponding Git tag. ########## polaris-core/src/main/java/org/apache/polaris/core/storage/oss/OssCredentialsStorageIntegration.java: ########## @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.oss; + +import static org.apache.polaris.core.config.FeatureConfiguration.STORAGE_CREDENTIAL_DURATION_SECONDS; + +import com.aliyuncs.AcsRequest; +import com.aliyuncs.AcsResponse; +import com.aliyuncs.DefaultAcsClient; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.profile.DefaultProfile; +import com.aliyuncs.auth.sts.AssumeRoleRequest; +import com.aliyuncs.auth.sts.AssumeRoleResponse; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.annotation.Nonnull; +import java.net.URI; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.storage.InMemoryStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.StorageAccessProperty; +import org.apache.polaris.core.storage.StorageUtil; +import org.apache.polaris.core.storage.oss.OssStsClientProvider.StsDestination; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** OSS credential vendor that supports generating temporary credentials using Alibaba Cloud STS */ +public class OssCredentialsStorageIntegration + extends InMemoryStorageIntegration<OssStorageConfigurationInfo> { + + private static final Logger LOGGER = + LoggerFactory.getLogger(OssCredentialsStorageIntegration.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final OssStsClientProvider stsClientProvider; + + public OssCredentialsStorageIntegration() { + this(new OssStsClientProvider()); + } + + + public OssCredentialsStorageIntegration(OssStsClientProvider stsClientProvider) { + super(OssCredentialsStorageIntegration.class.getName()); + this.stsClientProvider = stsClientProvider; + } + + /** {@inheritDoc} */ + @Override + public EnumMap<StorageAccessProperty, String> getSubscopedCreds( + @Nonnull CallContext callContext, + @Nonnull OssStorageConfigurationInfo storageConfig, + boolean allowListOperation, + @Nonnull Set<String> allowedReadLocations, + @Nonnull Set<String> allowedWriteLocations) { + + int storageCredentialDurationSeconds = + callContext.getRealmConfig().getConfig(STORAGE_CREDENTIAL_DURATION_SECONDS); + + EnumMap<StorageAccessProperty, String> credentialMap = + new EnumMap<>(StorageAccessProperty.class); + + try { + // Generate role session name + String roleSessionName = "PolarisOssCredentialsStorageIntegration-" + System.currentTimeMillis(); + + // Generate RAM policy for scoped access + String policy = generateOssPolicy(allowListOperation, allowedReadLocations, allowedWriteLocations); + + // Use STS client provider to assume role directly + AssumeRoleResponse response = stsClientProvider.assumeRole( + storageConfig.getRoleArn(), + roleSessionName, + storageConfig.getExternalId(), + storageConfig.getRegion(), + storageConfig.getStsEndpointUri(), + policy, + (long) storageCredentialDurationSeconds); + + AssumeRoleResponse.Credentials credentials = response.getCredentials(); + + // Populate credential map with STS response + credentialMap.put(StorageAccessProperty.OSS_ACCESS_KEY_ID, credentials.getAccessKeyId()); + credentialMap.put(StorageAccessProperty.OSS_ACCESS_KEY_SECRET, credentials.getAccessKeySecret()); + credentialMap.put(StorageAccessProperty.OSS_SECURITY_TOKEN, credentials.getSecurityToken()); + + // 设置为OSS模式的特殊属性,帮助Apache Iceberg识别这是OSS凭证 + credentialMap.put(StorageAccessProperty.CLIENT_REGION, storageConfig.getRegion() != null ? storageConfig.getRegion() : "cn-hangzhou"); + + // Set expiration time - convert ISO format to milliseconds timestamp + String expirationStr = credentials.getExpiration(); + try { + Instant expirationInstant = Instant.parse(expirationStr); + long expirationMillis = expirationInstant.toEpochMilli(); + credentialMap.put(StorageAccessProperty.EXPIRATION_TIME, String.valueOf(expirationMillis)); + } catch (Exception e) { + LOGGER.warn("Failed to parse expiration time '{}', using current time + 1 hour", expirationStr, e); + // Fallback to current time + 1 hour if parsing fails + long fallbackExpiration = System.currentTimeMillis() + 3600_000; + // 1 hour + credentialMap.put(StorageAccessProperty.EXPIRATION_TIME, String.valueOf(fallbackExpiration)); Review Comment: The assumption here looks odd to me. How can you be sure that the credentials will be valid for the next hour or even at all? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
