snazy commented on code in PR #2280:
URL: https://github.com/apache/polaris/pull/2280#discussion_r2272725103


##########
api/iceberg-aws-sign-service/src/main/java/org/apache/polaris/service/aws/sign/model/PolarisS3SignRequest.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.aws.sign.model;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import jakarta.annotation.Nullable;
+import org.apache.iceberg.aws.s3.signer.S3SignRequest;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+/**
+ * Request for S3 signing requests.
+ *
+ * <p>Copy of {@link S3SignRequest}, because the original does not have 
Jackson annotations.
+ */
+@PolarisImmutable
+@JsonDeserialize(as = ImmutablePolarisS3SignRequest.class)
+@JsonSerialize(as = ImmutablePolarisS3SignRequest.class)
+public interface PolarisS3SignRequest extends S3SignRequest {
+
+  @Value.Default
+  @Nullable // Replace javax.annotation.Nullable with 
jakarta.annotation.Nullable

Review Comment:
   ```suggestion
     @Nullable // Replace javax.annotation.Nullable from 'S3SignRequest` with 
jakarta.annotation.Nullable
   ```



##########
polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java:
##########
@@ -349,4 +349,20 @@ public static void enforceFeatureEnabledOrThrow(
                   + "it is still possible to enforce the uniqueness of table 
locations within a catalog.")
           .defaultValue(false)
           .buildFeatureConfiguration();
+
+  public static final FeatureConfiguration<Boolean> REMOTE_SIGNING_ENABLED =
+      PolarisConfiguration.<Boolean>builder()
+          .key("REMOTE_SIGNING_ENABLED")
+          .catalogConfig("polaris.config.remote-signing.enabled")
+          .description(
+              "If true, the remote signing endpoints are enabled either 
globally, or for a specific catalog.")

Review Comment:
   Maybe add a note that the feature is experimental, elaborating why on the 
`CHANGELOG.md?



##########
polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java:
##########
@@ -39,6 +42,21 @@ public enum StorageAccessProperty {
       Boolean.class, "s3.path-style-access", "whether to use S3 path style 
access", false),
   CLIENT_REGION(
       String.class, "client.region", "region to configure client for making 
requests to AWS"),
+  AWS_REMOTE_SIGNING_ENABLED(
+      Boolean.class,
+      S3FileIOProperties.REMOTE_SIGNING_ENABLED,
+      "whether to enable remote signing for S3 requests",

Review Comment:
   Maybe add some text saying that this depends on the "global" flag.



##########
polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisEndpoints.java:
##########
@@ -94,4 +101,18 @@ public static Set<Endpoint> 
getSupportedPolicyEndpoints(CallContext callContext)
         
callContext.getRealmConfig().getConfig(FeatureConfiguration.ENABLE_POLICY_STORE);
     return policyStoreEnabled ? POLICY_STORE_ENDPOINTS : ImmutableSet.of();
   }
+
+  /**
+   * Get the remote signing endpoints. Returns {@link 
#REMOTE_SIGNING_ENDPOINTS} if {@link
+   * FeatureConfiguration#REMOTE_SIGNING_ENABLED} is set globally to true or 
if the catalog enables
+   * remote signing; otherwise, returns an empty set.
+   */
+  public static Set<Endpoint> getSupportedRemoteSigningEndpoints(
+      CallContext callContext, CatalogEntity catalogEntity) {
+    boolean remoteSigningEnabled =
+        callContext
+            .getRealmConfig()

Review Comment:
   ```suggestion
         RealmConfig realmConfig, CatalogEntity catalogEntity) {
       boolean remoteSigningEnabled =
           realmConfig
   ```



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -774,21 +757,55 @@ public Optional<LoadTableResponse> 
loadTableWithAccessDelegationIfStale(
     throw new IllegalStateException("Cannot wrap catalog that does not produce 
BaseTable");
   }
 
+  private CatalogEntity getCatalogEntity() {
+    PolarisResolvedPathWrapper catalogPath = 
resolutionManifest.getResolvedReferenceCatalogEntity();
+    callContext
+        .getPolarisCallContext()
+        .getDiagServices()
+        .checkNotNull(catalogPath, "No catalog available for loadTable 
request");
+    CatalogEntity catalogEntity = 
CatalogEntity.of(catalogPath.getRawLeafEntity());
+    LOGGER.info("Catalog type: {}", catalogEntity.getCatalogType());
+    return catalogEntity;
+  }
+
   private LoadTableResponse.Builder 
buildLoadTableResponseWithDelegationCredentials(
       TableIdentifier tableIdentifier,
       TableMetadata tableMetadata,
       Set<PolarisStorageActions> actions,
-      String snapshots) {
+      Set<AccessDelegationMode> delegationModes,
+      CatalogEntity catalogEntity) {
     LoadTableResponse.Builder responseBuilder =
         LoadTableResponse.builder().withTableMetadata(tableMetadata);
-    if (baseCatalog instanceof SupportsCredentialDelegation 
credentialDelegation) {
+    if (baseCatalog instanceof SupportsCredentialDelegation 
credentialDelegation
+        && delegationModes.contains(AccessDelegationMode.VENDED_CREDENTIALS)) {
       LOGGER
           .atDebug()
           .addKeyValue("tableIdentifier", tableIdentifier)
           .addKeyValue("tableLocation", tableMetadata.location())
           .log("Fetching client credentials for table");
       AccessConfig accessConfig =
-          credentialDelegation.getAccessConfig(tableIdentifier, tableMetadata, 
actions);
+          credentialDelegation.getAccessConfigForCredentialDelegation(
+              tableIdentifier, tableMetadata, actions);
+      Map<String, String> credentialConfig = accessConfig.credentials();
+      responseBuilder.addAllConfig(credentialConfig);
+      responseBuilder.addAllConfig(accessConfig.extraProperties());
+      if (!credentialConfig.isEmpty()) {
+        responseBuilder.addCredential(
+            ImmutableCredential.builder()
+                .prefix(tableMetadata.location())
+                .config(credentialConfig)
+                .build());
+      }

Review Comment:
   Looks like this part is duplicated. Any chance to remove the duplciated code?



##########
runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java:
##########
@@ -92,7 +104,10 @@ public Catalog createCallContextCatalog(
             securityContext,
             taskExecutor,
             fileIOFactory,
-            polarisEventListener);
+            polarisEventListener,
+            storageIntegrationProvider,
+            prefixParser,
+            uriInfo);

Review Comment:
   (Not related to this PR)
   The consistently increasing amount of parameters passed to these types 
worries me.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java:
##########
@@ -866,13 +888,52 @@ public AccessConfig getAccessConfig(
       return AccessConfig.builder().build();
     }
     return FileIOUtil.refreshAccessConfig(
-        callContext,
-        storageCredentialCache,
-        getCredentialVendor(),
-        tableIdentifier,
-        getLocationsAllowedToBeAccessed(tableMetadata),
-        storageActions,
-        storageInfo.get());
+            callContext,
+            storageCredentialCache,
+            getCredentialVendor(),
+            tableIdentifier,
+            getLocationsAllowedToBeAccessed(tableMetadata),
+            storageActions,
+            storageInfo.get())
+        .orElse(AccessConfig.EMPTY);
+  }
+
+  @Override
+  public AccessConfig getAccessConfigForRemoteSigning(TableIdentifier 
tableIdentifier) {
+
+    Optional<PolarisStorageConfigurationInfo> configurationInfo =
+        findStorageInfo(tableIdentifier)
+            .map(
+                info ->
+                    deserialize(

Review Comment:
   Nit: move the deserialze() to a separate .map() for readability.



##########
integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisS3RemoteSigningIntegrationTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.it.test;
+
+import static 
org.apache.polaris.core.storage.StorageAccessProperty.AWS_ENDPOINT;
+import static 
org.apache.polaris.core.storage.StorageAccessProperty.AWS_PATH_STYLE_ACCESS;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.io.ResolvingFileIO;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.service.it.env.CatalogConfig;
+import org.apache.polaris.service.it.env.RestCatalogConfig;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/** Runs PolarisRestCatalogIntegrationBase test on S3 with remote signing 
enabled. */
+@CatalogConfig(properties = {"header.X-Iceberg-Access-Delegation", 
"remote-signing"})
+@RestCatalogConfig({
+  // The default client file IO implementation is InMemoryFileIO,
+  // which does not support remote signing.
+  org.apache.iceberg.CatalogProperties.FILE_IO_IMPL,
+  "org.apache.iceberg.io.ResolvingFileIO",
+})
+public abstract class PolarisS3RemoteSigningIntegrationTest
+    extends PolarisRestCatalogAwsIntegrationTest {
+
+  @Override
+  protected ImmutableMap.Builder<String, String> clientFileIOProperties() {
+    ImmutableMap.Builder<String, String> builder =
+        super.clientFileIOProperties()
+            .put(AWS_PATH_STYLE_ACCESS.getPropertyName(), 
String.valueOf(pathStyleAccess()));
+    endpoint().ifPresent(endpoint -> 
builder.put(AWS_ENDPOINT.getPropertyName(), endpoint));
+    return builder;
+  }
+
+  /**
+   * Override this method to return true if the S3 client should use 
path-style access. Default is
+   * false, which means virtual-hosted-style access will be used.
+   */
+  protected boolean pathStyleAccess() {
+    return false;
+  }
+
+  @CatalogConfig(properties = {"polaris.config.remote-signing.enabled", 
"false"})
+  @Test
+  public void testInternalCatalogRemoteSigningDisabled() {
+    @SuppressWarnings("resource")
+    RESTCatalog catalog = catalog();
+    Namespace ns1 = Namespace.of("ns1");
+    catalog.createNamespace(ns1);
+    TableIdentifier tableIdentifier = TableIdentifier.of(ns1, "my_table");
+    assertThatThrownBy(() -> catalog.createTable(tableIdentifier, SCHEMA))
+        .isInstanceOf(ForbiddenException.class)
+        .hasMessageContaining("Remote signing is not enabled for this catalog")
+        
.hasMessageContaining(FeatureConfiguration.REMOTE_SIGNING_ENABLED.key())
+        
.hasMessageContaining(FeatureConfiguration.REMOTE_SIGNING_ENABLED.catalogConfig());
+  }
+
+  @CatalogConfig(Catalog.TypeEnum.EXTERNAL)
+  @Test
+  public void testExternalCatalogRemoteSigningDisabled() {
+    @SuppressWarnings("resource")
+    RESTCatalog catalog = catalog();
+    Namespace ns1 = Namespace.of("ns1");
+    catalog.createNamespace(ns1);
+    TableMetadata tableMetadata =
+        TableMetadata.newTableMetadata(
+            SCHEMA,
+            PartitionSpec.unpartitioned(),
+            externalCatalogBaseLocation() + "/ns1/my_table",
+            Map.of());
+    try (ResolvingFileIO resolvingFileIO = initializeClientFileIO(new 
ResolvingFileIO())) {
+      String fileLocation =
+          externalCatalogBaseLocation() + 
"/ns1/my_table/metadata/v1.metadata.json";
+      TableMetadataParser.write(tableMetadata, 
resolvingFileIO.newOutputFile(fileLocation));
+      catalog.registerTable(TableIdentifier.of(ns1, "my_table"), fileLocation);
+      try {
+        assertThatThrownBy(() -> catalog.loadTable(TableIdentifier.of(ns1, 
"my_table")))
+            .isInstanceOf(ForbiddenException.class)
+            .hasMessageContaining("Remote signing is not enabled for external 
catalogs")
+            .hasMessageContaining(
+                
FeatureConfiguration.REMOTE_SIGNING_EXTERNAL_CATALOGS_ENABLED.key());
+      } finally {
+        resolvingFileIO.deleteFile(fileLocation);
+      }
+    }
+  }
+
+  @Test
+  @Override
+  @Disabled("It's not possible to request an access delegation mode when 
registering a table.")
+  public void testRegisterTable() {

Review Comment:
   I _think_ it's worth to try out whether the register-table endpoint receives 
the `X-Iceberg-Access-Delegation` header. In Nessie, we accept that header in 
the register-table endpoint and haven't ignored this test.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -710,30 +697,26 @@ public Optional<LoadTableResponse> 
loadTableWithAccessDelegationIfStale(
           read, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier);
     }
 
-    PolarisResolvedPathWrapper catalogPath = 
resolutionManifest.getResolvedReferenceCatalogEntity();
-    callContext
-        .getPolarisCallContext()
-        .getDiagServices()
-        .checkNotNull(catalogPath, "No catalog available for loadTable 
request");
-    CatalogEntity catalogEntity = 
CatalogEntity.of(catalogPath.getRawLeafEntity());
-    LOGGER.info("Catalog type: {}", catalogEntity.getCatalogType());
-    LOGGER.info(
-        "allow external catalog credential vending: {}",
-        callContext
-            .getRealmConfig()
-            .getConfig(
-                
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity));
-    if (catalogEntity
-            .getCatalogType()
-            
.equals(org.apache.polaris.core.admin.model.Catalog.TypeEnum.EXTERNAL)
-        && !callContext
-            .getRealmConfig()
-            .getConfig(
-                
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)) 
{
-      throw new ForbiddenException(
-          "Access Delegation is not enabled for this catalog. Please consult 
applicable "
-              + "documentation for the catalog config property '%s' to enable 
this feature",
-          
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING.catalogConfig());
+    CatalogEntity catalogEntity = getCatalogEntity();
+
+    if (delegationModes.contains(AccessDelegationMode.VENDED_CREDENTIALS)) {
+
+      LOGGER.info(
+          "allow external catalog credential vending: {}",
+          callContext
+              .getRealmConfig()
+              .getConfig(
+                  
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity));

Review Comment:
   Nit: extract this value (duplicated) to a variable.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java:
##########
@@ -205,11 +206,19 @@ public Catalog.TypeEnum getCatalogType() {
         .orElse(null);
   }
 
+  public boolean isExternal() {
+    return Objects.equals(getCatalogType(), Catalog.TypeEnum.EXTERNAL);

Review Comment:
   ```suggestion
       return getCatalogType() == Catalog.TypeEnum.EXTERNAL;
   ```
   enum values have object-identity ;)



##########
runtime/service/src/main/java/org/apache/polaris/service/storage/aws/signer/S3RemoteSigningCatalogHandler.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.storage.aws.signer;
+
+import jakarta.ws.rs.core.SecurityContext;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
+import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.RealmConfig;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
+import org.apache.polaris.service.aws.sign.model.PolarisS3SignRequest;
+import org.apache.polaris.service.aws.sign.model.PolarisS3SignResponse;
+import org.apache.polaris.service.catalog.common.CatalogHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3RemoteSigningCatalogHandler extends CatalogHandler implements 
AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(S3RemoteSigningCatalogHandler.class);
+
+  private final S3RequestSigner s3RequestSigner;
+
+  private CatalogEntity catalogEntity;
+
+  public S3RemoteSigningCatalogHandler(
+      CallContext callContext,
+      ResolutionManifestFactory resolutionManifestFactory,
+      SecurityContext securityContext,
+      String catalogName,
+      PolarisAuthorizer authorizer,
+      S3RequestSigner s3RequestSigner) {
+    super(callContext, resolutionManifestFactory, securityContext, 
catalogName, authorizer);
+    this.s3RequestSigner = s3RequestSigner;
+  }
+
+  @Override
+  protected void initializeCatalog() {
+    catalogEntity =
+        
CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity());
+    if (catalogEntity.isExternal()) {
+      throw new ForbiddenException("Cannot use S3 remote signing with 
federated catalogs.");
+    }
+    // no need to materialize the catalog here, as we only need the catalog 
entity
+  }
+
+  public PolarisS3SignResponse signS3Request(
+      PolarisS3SignRequest s3SignRequest, TableIdentifier tableIdentifier) {
+
+    LOGGER.debug("Requesting s3 signing for {}: {}", tableIdentifier, 
s3SignRequest);
+
+    // TODO authorize based on the request's method?
+
+    try {
+      authorizeBasicTableLikeOperationOrThrow(
+          PolarisAuthorizableOperation.SIGN_S3_REQUEST,
+          PolarisEntitySubType.ICEBERG_TABLE,
+          tableIdentifier);
+    } catch (NoSuchTableException e) {
+      // FIXME can we do better here?
+      authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
+          PolarisAuthorizableOperation.SIGN_S3_REQUEST, tableIdentifier);
+    }
+
+    throwIfRemoteSigningNotEnabled(callContext.getRealmConfig(), 
catalogEntity);

Review Comment:
   Nit: move before the `try {`



##########
runtime/service/src/main/java/org/apache/polaris/service/storage/aws/signer/S3RemoteSigningCatalogAdapter.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.storage.aws.signer;
+
+import com.google.common.annotations.VisibleForTesting;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.Response;
+import jakarta.ws.rs.core.SecurityContext;
+import java.util.function.Function;
+import org.apache.iceberg.aws.s3.signer.S3SignResponse;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
+import org.apache.polaris.service.aws.sign.api.IcebergRestS3SignerApiService;
+import org.apache.polaris.service.aws.sign.model.PolarisS3SignRequest;
+import org.apache.polaris.service.catalog.CatalogPrefixParser;
+import org.apache.polaris.service.catalog.common.CatalogAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An adapter between generated service types like 
`IcebergRestCatalogApiService` and
+ * `IcebergCatalogHandler`.
+ */
+@RequestScoped
+public class S3RemoteSigningCatalogAdapter
+    implements IcebergRestS3SignerApiService, CatalogAdapter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(S3RemoteSigningCatalogAdapter.class);
+
+  @Inject CallContext callContext;
+  @Inject ResolutionManifestFactory resolutionManifestFactory;
+  @Inject PolarisAuthorizer polarisAuthorizer;
+  @Inject CatalogPrefixParser prefixParser;
+  @Inject S3RequestSigner s3RequestSigner;
+
+  /**
+   * Execute operations on a catalog wrapper and ensure we close the 
BaseCatalog afterward. This
+   * will typically ensure the underlying FileIO is closed.
+   */
+  private Response withCatalog(
+      SecurityContext securityContext,
+      String prefix,
+      Function<S3RemoteSigningCatalogHandler, Response> action) {
+    String catalogName = 
prefixParser.prefixToCatalogName(callContext.getRealmContext(), prefix);
+    try (S3RemoteSigningCatalogHandler handler = 
newHandlerWrapper(securityContext, catalogName)) {
+      return action.apply(handler);
+    } catch (RuntimeException e) {
+      LOGGER.debug("RuntimeException while operating on catalog. Propagating 
to caller.", e);
+      throw e;
+    } catch (Exception e) {
+      LOGGER.error("Error while operating on catalog", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  S3RemoteSigningCatalogHandler newHandlerWrapper(
+      SecurityContext securityContext, String catalogName) {
+    validatePrincipal(securityContext);
+    return new S3RemoteSigningCatalogHandler(
+        callContext,
+        resolutionManifestFactory,
+        securityContext,
+        catalogName,
+        polarisAuthorizer,
+        s3RequestSigner);
+  }
+
+  @Override
+  public Response signS3Request(
+      String prefix,
+      String namespace,
+      String table,
+      PolarisS3SignRequest s3SignRequest,
+      RealmContext realmContext,
+      SecurityContext securityContext) {
+    Namespace ns = decodeNamespace(namespace);
+    TableIdentifier tableIdentifier = TableIdentifier.of(ns, 
RESTUtil.decodeString(table));
+    return withCatalog(
+        securityContext,
+        prefix,
+        catalog -> {
+          S3SignResponse response = catalog.signS3Request(s3SignRequest, 
tableIdentifier);
+          return Response.status(Response.Status.OK).entity(response).build();

Review Comment:
   ... but that would require clients to respect those 🤷 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to