This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 24d40ee041 [#10570][followup] feat(iceberg): Skip the authorization 
for the remote REST catalog (#10704)
24d40ee041 is described below

commit 24d40ee041ec35221abdc62b696db209270c495d
Author: roryqi <[email protected]>
AuthorDate: Tue Apr 21 16:01:35 2026 +0800

    [#10570][followup] feat(iceberg): Skip the authorization for the remote 
REST catalog (#10704)
    
    ### What changes were proposed in this pull request?
    
    Skip the authorization for the remote REST catalog
    
    ### Why are the changes needed?
    
    This is  #10570 follow-up PR.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, added the new config options.
    
    ### How was this patch tested?
    
    Add UTs.
---
 .../lakehouse/iceberg/IcebergConstants.java        |   1 +
 docs/iceberg-rest-service.md                       |  21 +-
 .../gravitino/iceberg/common/IcebergConfig.java    |   9 +
 .../iceberg/common/ops/IcebergCatalogWrapper.java  |   5 +
 .../iceberg/common/TestIcebergConfig.java          |   9 +
 .../org/apache/gravitino/iceberg/RESTService.java  |   8 +-
 .../iceberg/service/CatalogWrapperForREST.java     |   9 +-
 .../authorization/IcebergRESTServerContext.java    |   9 +
 ...BaseMetadataAuthorizationMethodInterceptor.java |  15 +-
 ...bergMetadataAuthorizationMethodInterceptor.java |  31 ++-
 .../TestIcebergCatalogWrapperManagerForREST.java   |   6 +-
 .../iceberg/service/TestIcebergRESTUtils.java      |   2 +-
 .../dispatcher/TestIcebergTableHookDispatcher.java |   2 +-
 .../provider/TestDynamicIcebergConfigProvider.java |   3 +-
 .../iceberg/service/rest/IcebergRestTestUtil.java  |   3 +-
 ...bergMetadataAuthorizationMethodInterceptor.java | 230 ++++++++++++++++++++-
 16 files changed, 338 insertions(+), 25 deletions(-)

diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
index c8418d7dc7..cbfe027188 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
@@ -101,6 +101,7 @@ public class IcebergConstants {
   public static final String ICEBERG_REST_DEFAULT_METALAKE = "gravitino";
   public static final String ICEBERG_REST_DEFAULT_CATALOG = "default_catalog";
   public static final String ICEBERG_REST_DEFAULT_DYNAMIC_CATALOG_NAME = 
"default-catalog-name";
+  public static final String ICEBERG_REST_DISABLE_REST_AUTHZ = 
"disable-rest-authz";
 
   public static final String TABLE_METADATA_CACHE_IMPL = 
"table-metadata-cache-impl";
   public static final String TABLE_METADATA_CACHE_CAPACITY = 
"table-metadata-cache-capacity";
diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md
index 1e437425f3..9405823a46 100644
--- a/docs/iceberg-rest-service.md
+++ b/docs/iceberg-rest-service.md
@@ -130,12 +130,15 @@ If you are using multiple JDBC catalog backends, setting 
`jdbc-initialize` to tr
 
 Use the REST backend to proxy another Iceberg REST catalog server (IRC2). The 
Gravitino Iceberg REST service acts as IRC1 and forwards catalog operations to 
IRC2.
 
-| Configuration item                       | Description                       
                                                                                
            | Default value | Required | Since Version |
-|------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|---------------|----------|---------------|
-| `gravitino.iceberg-rest.catalog-backend` | The Catalog backend of the 
Gravitino Iceberg REST catalog service. Use the value **`rest`** for the REST 
catalog backend.     | `memory`      | Yes      | 0.2.0         |
-| `gravitino.iceberg-rest.uri`             | The Iceberg REST catalog URI 
(IRC2), such as `http://127.0.0.1:9001/iceberg`.                                
                 | (none)        | Yes      | 0.2.0         |
-| `gravitino.iceberg-rest.warehouse`       | The catalog name in the Iceberg 
REST spec. Set to a specific catalog name, or leave empty to use the default 
catalog on IRC2. | (none)        | No       | 0.2.0         |
-| `gravitino.iceberg-rest.data-access`     | Data access mode exposed to 
Iceberg REST clients via `/v1/config`. Supported values: `vended-credentials`, 
`remote-signing`.  | (none)        | No       | 1.3.0         |
+By default, when the backend catalog is a REST catalog, IRC1 skips 
authorization and behaves as a proxy. IRC2 handles authorization. If you want 
IRC1 to keep authorization checks, set 
`gravitino.iceberg-rest.disable-rest-authz=false`.
+
+| Configuration item                                                  | 
Description                                                                     
                                                                               
| Default value | Required | Since Version |
+|---------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|---------------|
+| `gravitino.iceberg-rest.catalog-backend`                            | The 
Catalog backend of the Gravitino Iceberg REST catalog service. Use the value 
**`rest`** for the REST catalog backend.                                      | 
`memory`      | Yes      | 0.2.0         |
+| `gravitino.iceberg-rest.uri`                                        | The 
Iceberg REST catalog URI (IRC2), such as `http://127.0.0.1:9001/iceberg`.       
                                                                           | 
(none)        | Yes      | 0.2.0         |
+| `gravitino.iceberg-rest.warehouse`                                  | The 
catalog name in the Iceberg REST spec. Set to a specific catalog name, or leave 
empty to use the default catalog on IRC2.                                  | 
(none)        | No       | 0.2.0         |
+| `gravitino.iceberg-rest.data-access`                                | Data 
access mode exposed to Iceberg REST clients via `/v1/config`. Supported values: 
`vended-credentials`, `remote-signing`.                                   | 
(none)        | No       | 1.3.0         |
+| `gravitino.iceberg-rest.disable-rest-authz`                         | 
Whether IRC1 disables authorization when the target backend catalog is a REST 
catalog. Set to `false` if you want IRC1 to enforce authorization before 
proxying. | `true`        | No       | 1.3.0         |
 
 IRC1 configuration example if IRC2 using HDFS storage:
 
@@ -158,6 +161,10 @@ gravitino.iceberg-rest.header.X-Iceberg-Access-Delegation 
= vended-credentials
 
 IRC1 must also configure S3 configurations if the client side requests 
credential vending.
 
+:::caution
+If IRC2 does not enforce authorization, keeping 
`gravitino.iceberg-rest.disable-rest-authz=true` can leave operations 
unprotected. Set it to `false` to enforce authorization in IRC1.
+:::
+
 `data-access` is returned in `/v1/config` defaults for REST clients:
 
 - `vended-credentials`: clients should request credential vending 
(`X-Iceberg-Access-Delegation: vended-credentials`).
@@ -468,7 +475,7 @@ View operations are supported when using the JDBC catalog 
backend with schema ve
 
 | Configuration item                           | Description                   
                                                             | Default value | 
Required | Since Version |
 
|----------------------------------------------|--------------------------------------------------------------------------------------------|---------------|----------|---------------|
-| `gravitino.iceberg-rest.jdbc-schema-version` | The schema version of the 
JDBC catalog backend. Set to `V1` to enable view operations.    | `V0`          
| No       | 1.2.0         |
+| `gravitino.iceberg-rest.jdbc-schema-version` | The schema version of the 
JDBC catalog backend. Set to `V1` to enable view operations.     | `V0`         
 | No       | 1.2.0         |
 
 ### Other Apache Iceberg catalog properties
 
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
index f4b83b68e1..4da89e37ef 100644
--- 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
@@ -220,6 +220,15 @@ public class IcebergConfig extends Config implements 
OverwriteDefaultConfig {
           .stringConf()
           
.createWithDefault(IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME);
 
+  public static final ConfigEntry<Boolean> ICEBERG_REST_DISABLE_REST_AUTHZ =
+      new ConfigBuilder(IcebergConstants.ICEBERG_REST_DISABLE_REST_AUTHZ)
+          .doc(
+              "Whether to disable authorization in IRC1 when backend catalog 
is a REST catalog. "
+                  + "Set to false to enforce authorization in IRC1 before 
proxying to IRC2.")
+          .version(ConfigConstants.VERSION_1_3_0)
+          .booleanConf()
+          .createWithDefault(true);
+
   public static final ConfigEntry<String> GRAVITINO_URI =
       new ConfigBuilder(IcebergConstants.GRAVITINO_URI)
           .doc(
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
index 750efd92d5..97919418a6 100644
--- 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.catalog.ViewCatalog;
 import org.apache.iceberg.jdbc.JdbcCatalogWithMetadataLocationSupport;
 import org.apache.iceberg.rest.CatalogHandlers;
+import org.apache.iceberg.rest.RESTCatalog;
 import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
 import org.apache.iceberg.rest.requests.CreateViewRequest;
@@ -372,6 +373,10 @@ public class IcebergCatalogWrapper implements 
AutoCloseable {
     }
   }
 
+  public boolean isRESTCatalog() {
+    return getCatalog() instanceof RESTCatalog;
+  }
+
   /**
    * Whether the wrapper is recreated with a different classloader.
    *
diff --git 
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/TestIcebergConfig.java
 
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/TestIcebergConfig.java
index 3294c72b26..86da78ed60 100644
--- 
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/TestIcebergConfig.java
+++ 
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/TestIcebergConfig.java
@@ -62,4 +62,13 @@ public class TestIcebergConfig {
     Assertions.assertEquals(1000, jettyServerConfig.getHttpPort());
     Assertions.assertEquals(1001, jettyServerConfig.getHttpsPort());
   }
+
+  @Test
+  public void testDisableRestAuthzConfigKey() {
+    Map<String, String> propertiesWithNewKey =
+        
ImmutableMap.of(IcebergConfig.ICEBERG_REST_DISABLE_REST_AUTHZ.getKey(), 
"false");
+    IcebergConfig icebergConfigWithNewKey = new 
IcebergConfig(propertiesWithNewKey);
+    Assertions.assertFalse(
+        
icebergConfigWithNewKey.get(IcebergConfig.ICEBERG_REST_DISABLE_REST_AUTHZ));
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
index f4bcfedd8b..06655e541c 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
@@ -103,6 +103,8 @@ public class RESTService implements 
GravitinoAuxiliaryService {
     this.configProvider = 
IcebergConfigProviderFactory.create(configProperties);
     configProvider.initialize(configProperties);
     String metalakeName = configProvider.getMetalakeName();
+    boolean skipAuthorizationForRestBackend =
+        icebergConfig.get(IcebergConfig.ICEBERG_REST_DISABLE_REST_AUTHZ);
 
     Boolean enableAuth = 
GravitinoEnv.getInstance().config().get(Configs.ENABLE_AUTHORIZATION);
     EventBus eventBus = GravitinoEnv.getInstance().eventBus();
@@ -110,7 +112,11 @@ public class RESTService implements 
GravitinoAuxiliaryService {
         new IcebergCatalogWrapperManager(configProperties, configProvider, 
auxMode, metalakeName);
     IcebergRESTServerContext authorizationContext =
         IcebergRESTServerContext.create(
-            configProvider, enableAuth, auxMode, icebergCatalogWrapperManager);
+            configProvider,
+            enableAuth,
+            auxMode,
+            skipAuthorizationForRestBackend,
+            icebergCatalogWrapperManager);
     this.icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
     IcebergTableOperationDispatcher icebergTableOperationDispatcher =
         new IcebergTableOperationExecutor(icebergCatalogWrapperManager);
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
index dc1f6858c0..f69dd481d7 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
@@ -128,7 +128,7 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
   public LoadTableResponse createTable(
       Namespace namespace, CreateTableRequest request, boolean 
requestCredential) {
     LoadTableResponse loadTableResponse;
-    if (getCatalog() instanceof RESTCatalog) {
+    if (isRESTCatalog()) {
       loadTableResponse = createTableInternal(namespace, request);
     } else {
       loadTableResponse = super.createTable(namespace, request);
@@ -145,7 +145,7 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
   public LoadTableResponse loadTable(
       TableIdentifier identifier, boolean requestCredential, 
CredentialPrivilege privilege) {
     LoadTableResponse loadTableResponse;
-    if (getCatalog() instanceof RESTCatalog) {
+    if (isRESTCatalog()) {
       loadTableResponse = loadTableInternal(identifier);
     } else {
       loadTableResponse = super.loadTable(identifier);
@@ -159,9 +159,8 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
   @Override
   public LoadTableResponse updateTable(
       TableIdentifier tableIdentifier, UpdateTableRequest updateTableRequest) {
-    Catalog loadedCatalog = getCatalog();
-    if (loadedCatalog instanceof RESTCatalog) {
-      return CatalogHandlers.updateTable(loadedCatalog, tableIdentifier, 
updateTableRequest);
+    if (isRESTCatalog()) {
+      return CatalogHandlers.updateTable(getCatalog(), tableIdentifier, 
updateTableRequest);
     } else {
       return super.updateTable(tableIdentifier, updateTableRequest);
     }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/authorization/IcebergRESTServerContext.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/authorization/IcebergRESTServerContext.java
index b61c05a39c..ad3a2aae8e 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/authorization/IcebergRESTServerContext.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/authorization/IcebergRESTServerContext.java
@@ -26,6 +26,7 @@ import 
org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
 public class IcebergRESTServerContext {
   private boolean isAuthorizationEnabled;
   private boolean auxMode;
+  private boolean skipAuthorizationForRestBackend;
   private String metalakeName;
   private String defaultCatalogName;
   private IcebergCatalogWrapperManager catalogWrapperManager;
@@ -33,11 +34,13 @@ public class IcebergRESTServerContext {
   private IcebergRESTServerContext(
       Boolean isAuthorizationEnabled,
       Boolean auxMode,
+      Boolean skipAuthorizationForRestBackend,
       String metalakeName,
       String defaultCatalogName,
       IcebergCatalogWrapperManager catalogWrapperManager) {
     this.isAuthorizationEnabled = isAuthorizationEnabled;
     this.auxMode = auxMode;
+    this.skipAuthorizationForRestBackend = skipAuthorizationForRestBackend;
     this.metalakeName = metalakeName;
     this.defaultCatalogName = defaultCatalogName;
     this.catalogWrapperManager = catalogWrapperManager;
@@ -51,11 +54,13 @@ public class IcebergRESTServerContext {
       IcebergConfigProvider configProvider,
       Boolean enableAuth,
       Boolean auxMode,
+      Boolean skipAuthorizationForRestBackend,
       IcebergCatalogWrapperManager catalogWrapperManager) {
     InstanceHolder.INSTANCE =
         new IcebergRESTServerContext(
             enableAuth,
             auxMode,
+            skipAuthorizationForRestBackend,
             configProvider.getMetalakeName(),
             configProvider.getDefaultCatalogName(),
             catalogWrapperManager);
@@ -75,6 +80,10 @@ public class IcebergRESTServerContext {
     return auxMode;
   }
 
+  public boolean skipAuthorizationForRestBackend() {
+    return skipAuthorizationForRestBackend;
+  }
+
   public String metalakeName() {
     return metalakeName;
   }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
index 4632d70a68..f2553751bd 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/BaseMetadataAuthorizationMethodInterceptor.java
@@ -103,6 +103,15 @@ public abstract class 
BaseMetadataAuthorizationMethodInterceptor implements Meth
     return false;
   }
 
+  /**
+   * Hook for subclasses to skip standard authorization before user validation 
and expression
+   * evaluation.
+   */
+  protected boolean shouldSkipAuthorization(
+      Map<Entity.EntityType, NameIdentifier> nameIdentifierMap) {
+    return false;
+  }
+
   /**
    * Determine whether authorization is required and the rules via the 
authorization annotation ,
    * and obtain the metadata ID that requires authorization via the 
authorization annotation.
@@ -123,11 +132,12 @@ public abstract class 
BaseMetadataAuthorizationMethodInterceptor implements Meth
         Object[] args = methodInvocation.getArguments();
         Map<Entity.EntityType, NameIdentifier> nameIdentifierMap =
             extractNameIdentifierFromParameters(parameters, args);
+        boolean skipStandardCheck = shouldSkipAuthorization(nameIdentifierMap);
 
         // Check if current user exists in the metalake.
         NameIdentifier metalakeIdent = 
nameIdentifierMap.get(Entity.EntityType.METALAKE);
 
-        if (metalakeIdent != null) {
+        if (!skipStandardCheck && metalakeIdent != null) {
           String currentUser = PrincipalUtils.getCurrentUserName();
           try {
             AuthorizationUtils.checkCurrentUser(metalakeIdent.name(), 
currentUser);
@@ -151,9 +161,8 @@ public abstract class 
BaseMetadataAuthorizationMethodInterceptor implements Meth
 
         // Process custom authorization if handler exists
         Optional<AuthorizationHandler> handler = 
createAuthorizationHandler(parameters, args);
-        boolean skipStandardCheck = false;
 
-        if (handler.isPresent()) {
+        if (!skipStandardCheck && handler.isPresent()) {
           AuthorizationHandler authzHandler = handler.get();
           authzHandler.process(nameIdentifierMap);
           skipStandardCheck = authzHandler.authorizationCompleted();
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
index 643e8eb532..0ff52f2781 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/server/web/filter/IcebergMetadataAuthorizationMethodInterceptor.java
@@ -26,6 +26,8 @@ import java.util.Optional;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.Entity.EntityType;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
+import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
 import org.apache.gravitino.iceberg.service.IcebergRESTUtils;
 import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
@@ -41,7 +43,6 @@ import org.apache.iceberg.rest.RESTUtil;
  */
 public class IcebergMetadataAuthorizationMethodInterceptor
     extends BaseMetadataAuthorizationMethodInterceptor {
-
   private final String metalakeName = 
IcebergRESTServerContext.getInstance().metalakeName();
 
   @Override
@@ -131,4 +132,32 @@ public class IcebergMetadataAuthorizationMethodInterceptor
   protected boolean isExceptionPropagate(Exception e) {
     return e.getClass().getName().startsWith("org.apache.iceberg.exceptions");
   }
+
+  /**
+   * Skip local authorization only when this server proxies requests to 
another Gravitino-backed
+   * Iceberg REST catalog. In that topology, the downstream REST backend 
performs authorization and
+   * this interceptor avoids duplicate checks.
+   */
+  @Override
+  protected boolean shouldSkipAuthorization(Map<EntityType, NameIdentifier> 
nameIdentifierMap) {
+    if 
(!IcebergRESTServerContext.getInstance().skipAuthorizationForRestBackend()) {
+      return false;
+    }
+
+    NameIdentifier catalogId = nameIdentifierMap.get(EntityType.CATALOG);
+    if (catalogId == null) {
+      return false;
+    }
+
+    IcebergCatalogWrapperManager wrapperManager =
+        IcebergRESTServerContext.getInstance().catalogWrapperManager();
+    if (wrapperManager == null) {
+      return false;
+    }
+
+    IcebergCatalogWrapper catalogWrapper = 
wrapperManager.getCatalogWrapper(catalogId.name());
+    // When IRC2 is another Gravitino server, IRC1 acts as a proxy and does 
not perform
+    // authorization. IRC2 handles authorization.
+    return catalogWrapper.isRESTCatalog();
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
index 47589e6549..f81ea32633 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManagerForREST.java
@@ -67,7 +67,7 @@ public class TestIcebergCatalogWrapperManagerForREST {
     IcebergCatalogWrapperManager manager =
         new IcebergCatalogWrapperManager(
             config, configProvider, false, configProvider.getMetalakeName());
-    IcebergRESTServerContext.create(configProvider, false, false, manager);
+    IcebergRESTServerContext.create(configProvider, false, false, true, 
manager);
 
     IcebergCatalogWrapper ops = manager.getOps(rawPrefix);
 
@@ -87,7 +87,7 @@ public class TestIcebergCatalogWrapperManagerForREST {
     IcebergCatalogWrapperManager manager =
         new IcebergCatalogWrapperManager(
             config, configProvider, false, configProvider.getMetalakeName());
-    IcebergRESTServerContext.create(configProvider, false, false, manager);
+    IcebergRESTServerContext.create(configProvider, false, false, true, 
manager);
 
     Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> 
manager.getOps(rawPrefix));
   }
@@ -100,7 +100,7 @@ public class TestIcebergCatalogWrapperManagerForREST {
     IcebergCatalogWrapperManager manager =
         new IcebergCatalogWrapperManager(
             config, configProvider, true, configProvider.getMetalakeName());
-    IcebergRESTServerContext.create(configProvider, true, true, manager);
+    IcebergRESTServerContext.create(configProvider, true, true, true, manager);
 
     IllegalArgumentException exception =
         Assertions.assertThrowsExactly(
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java
index 597151ca70..7c2e05c80b 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java
@@ -45,7 +45,7 @@ public class TestIcebergRESTUtils {
     
Mockito.when(icebergConfigProvider.getMetalakeName()).thenReturn("metalake");
     Mockito.when(icebergConfigProvider.getDefaultCatalogName())
         .thenReturn(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG);
-    IcebergRESTServerContext.create(icebergConfigProvider, false, false, null);
+    IcebergRESTServerContext.create(icebergConfigProvider, false, false, true, 
null);
   }
 
   @Test
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
index ef5f25f832..837f5e1b8c 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergTableHookDispatcher.java
@@ -95,7 +95,7 @@ public class TestIcebergTableHookDispatcher {
     IcebergConfigProvider mockConfigProvider = 
mock(IcebergConfigProvider.class);
     when(mockConfigProvider.getMetalakeName()).thenReturn(TEST_METALAKE);
     when(mockConfigProvider.getDefaultCatalogName()).thenReturn(TEST_CATALOG);
-    IcebergRESTServerContext.create(mockConfigProvider, false, false, null);
+    IcebergRESTServerContext.create(mockConfigProvider, false, false, true, 
null);
 
     // Create hook dispatcher
     hookDispatcher = new IcebergTableHookDispatcher(mockDispatcher);
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java
index ed527162db..669e41a24d 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java
@@ -72,7 +72,8 @@ public class TestDynamicIcebergConfigProvider {
     Mockito.when(mockProvider.getMetalakeName()).thenReturn("test_metalake");
     
Mockito.when(mockProvider.getDefaultCatalogName()).thenReturn("default_catalog");
     // When authorization is enabled, it implies running in auxiliary mode
-    IcebergRESTServerContext.create(mockProvider, authorizationEnabled, 
authorizationEnabled, null);
+    IcebergRESTServerContext.create(
+        mockProvider, authorizationEnabled, authorizationEnabled, true, null);
   }
 
   private void resetServerContext() throws IllegalAccessException {
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
index 23456758a8..a75c9c76ac 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
@@ -135,7 +135,8 @@ public class IcebergRestTestUtil {
       IcebergCatalogWrapperManager icebergCatalogWrapperManager =
           new IcebergCatalogWrapperManagerForTest(
               catalogConf, configProvider, false, 
configProvider.getMetalakeName());
-      IcebergRESTServerContext.create(configProvider, false, false, 
icebergCatalogWrapperManager);
+      IcebergRESTServerContext.create(
+          configProvider, false, false, true, icebergCatalogWrapperManager);
 
       EventBus eventBus = new EventBus(eventListenerPlugins);
 
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/server/web/filter/TestIcebergMetadataAuthorizationMethodInterceptor.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/server/web/filter/TestIcebergMetadataAuthorizationMethodInterceptor.java
index 1be3417158..5db3cca4af 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/server/web/filter/TestIcebergMetadataAuthorizationMethodInterceptor.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/server/web/filter/TestIcebergMetadataAuthorizationMethodInterceptor.java
@@ -21,19 +21,30 @@ package org.apache.gravitino.server.web.filter;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.lang.reflect.Method;
 import java.lang.reflect.Parameter;
 import java.util.Map;
+import java.util.Optional;
+import javax.ws.rs.core.Response;
+import org.aopalliance.intercept.MethodInvocation;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.iceberg.service.CatalogWrapperForREST;
+import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
 import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
 import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.rest.RESTCatalog;
 import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.responses.ErrorResponse;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -53,7 +64,7 @@ public class 
TestIcebergMetadataAuthorizationMethodInterceptor {
     IcebergConfigProvider mockConfigProvider = 
Mockito.mock(IcebergConfigProvider.class);
     
Mockito.when(mockConfigProvider.getMetalakeName()).thenReturn(TEST_METALAKE);
     
Mockito.when(mockConfigProvider.getDefaultCatalogName()).thenReturn(TEST_CATALOG);
-    IcebergRESTServerContext.create(mockConfigProvider, false, false, null);
+    IcebergRESTServerContext.create(mockConfigProvider, false, false, true, 
null);
   }
 
   @Test
@@ -165,6 +176,215 @@ public class 
TestIcebergMetadataAuthorizationMethodInterceptor {
     assertFalse(interceptor.isExceptionPropagate(otherException));
   }
 
+  @Test
+  public void testInvokeSkipsAuthorizationForRestCatalog() throws Throwable {
+    IcebergCatalogWrapperManager wrapperManager = 
Mockito.mock(IcebergCatalogWrapperManager.class);
+    CatalogWrapperForREST wrapper = Mockito.mock(CatalogWrapperForREST.class);
+    RESTCatalog restCatalog = Mockito.mock(RESTCatalog.class);
+    
Mockito.when(wrapperManager.getCatalogWrapper(TEST_CATALOG)).thenReturn(wrapper);
+    Mockito.when(wrapper.getCatalog()).thenReturn(restCatalog);
+    Mockito.when(wrapper.isRESTCatalog()).thenReturn(true);
+    resetContext(wrapperManager, true);
+
+    Method method =
+        TestOperations.class.getMethod(
+            "testTableOperationWithAuthorizationExpression",
+            String.class,
+            String.class,
+            String.class);
+    MethodInvocation invocation = Mockito.mock(MethodInvocation.class);
+    Mockito.when(invocation.getMethod()).thenReturn(method);
+    Mockito.when(invocation.getArguments())
+        .thenReturn(new Object[] {TEST_CATALOG + "/", TEST_SCHEMA, "tbl"});
+    Mockito.when(invocation.proceed()).thenReturn("PROCEEDED");
+
+    IcebergMetadataAuthorizationMethodInterceptor interceptor =
+        new IcebergMetadataAuthorizationMethodInterceptor() {
+          @Override
+          protected Optional<AuthorizationHandler> createAuthorizationHandler(
+              Parameter[] parameters, Object[] args) {
+            return Optional.of(
+                new AuthorizationHandler() {
+                  @Override
+                  public void process(Map<Entity.EntityType, NameIdentifier> 
nameIdentifierMap)
+                      throws ForbiddenException {
+                    throw new RuntimeException("test");
+                  }
+
+                  @Override
+                  public boolean authorizationCompleted() {
+                    return false;
+                  }
+                });
+          }
+        };
+    Object result = interceptor.invoke(invocation);
+
+    assertEquals("PROCEEDED", result);
+    Mockito.verify(invocation, Mockito.times(1)).proceed();
+  }
+
+  @Test
+  public void testInvokeDoesNotSkipAuthorizationForNonRestCatalog() throws 
Throwable {
+    IcebergCatalogWrapperManager wrapperManager = 
Mockito.mock(IcebergCatalogWrapperManager.class);
+    CatalogWrapperForREST wrapper = Mockito.mock(CatalogWrapperForREST.class);
+    Catalog nonRestCatalog = Mockito.mock(Catalog.class);
+    
Mockito.when(wrapperManager.getCatalogWrapper(TEST_CATALOG)).thenReturn(wrapper);
+    Mockito.when(wrapper.getCatalog()).thenReturn(nonRestCatalog);
+    Mockito.when(wrapper.isRESTCatalog()).thenReturn(false);
+    resetContext(wrapperManager, true);
+
+    Method method =
+        TestOperations.class.getMethod(
+            "testTableOperationWithAuthorizationExpression",
+            String.class,
+            String.class,
+            String.class);
+    MethodInvocation invocation = Mockito.mock(MethodInvocation.class);
+    Mockito.when(invocation.getMethod()).thenReturn(method);
+    Mockito.when(invocation.getArguments())
+        .thenReturn(new Object[] {TEST_CATALOG + "/", TEST_SCHEMA, "tbl"});
+    Mockito.when(invocation.proceed()).thenReturn("PROCEEDED");
+
+    IcebergMetadataAuthorizationMethodInterceptor interceptor =
+        new IcebergMetadataAuthorizationMethodInterceptor() {
+          @Override
+          protected Optional<AuthorizationHandler> createAuthorizationHandler(
+              Parameter[] parameters, Object[] args) {
+            return Optional.of(
+                new AuthorizationHandler() {
+                  @Override
+                  public void process(Map<Entity.EntityType, NameIdentifier> 
nameIdentifierMap)
+                      throws ForbiddenException {
+                    throw new RuntimeException("test");
+                  }
+
+                  @Override
+                  public boolean authorizationCompleted() {
+                    return false;
+                  }
+                });
+          }
+        };
+    Object result = interceptor.invoke(invocation);
+
+    assertNotEquals("PROCEEDED", result);
+  }
+
+  private void resetContext(IcebergCatalogWrapperManager wrapperManager) {
+    resetContext(wrapperManager, true);
+  }
+
+  private void resetContext(
+      IcebergCatalogWrapperManager wrapperManager, boolean 
skipAuthorizationForRestBackend) {
+    IcebergConfigProvider mockConfigProvider = 
Mockito.mock(IcebergConfigProvider.class);
+    
Mockito.when(mockConfigProvider.getMetalakeName()).thenReturn(TEST_METALAKE);
+    
Mockito.when(mockConfigProvider.getDefaultCatalogName()).thenReturn(TEST_CATALOG);
+    IcebergRESTServerContext.create(
+        mockConfigProvider, false, false, skipAuthorizationForRestBackend, 
wrapperManager);
+  }
+
+  @Test
+  public void 
testInvokeDoesNotSkipAuthorizationForRestCatalogWhenFlagDisabled() throws 
Throwable {
+    IcebergCatalogWrapperManager wrapperManager = 
Mockito.mock(IcebergCatalogWrapperManager.class);
+    CatalogWrapperForREST wrapper = Mockito.mock(CatalogWrapperForREST.class);
+    RESTCatalog restCatalog = Mockito.mock(RESTCatalog.class);
+    
Mockito.when(wrapperManager.getCatalogWrapper(TEST_CATALOG)).thenReturn(wrapper);
+    Mockito.when(wrapper.getCatalog()).thenReturn(restCatalog);
+    Mockito.when(wrapper.isRESTCatalog()).thenReturn(true);
+    resetContext(wrapperManager, false);
+
+    Method method =
+        TestOperations.class.getMethod(
+            "testTableOperationWithAuthorizationExpression",
+            String.class,
+            String.class,
+            String.class);
+    MethodInvocation invocation = Mockito.mock(MethodInvocation.class);
+    Mockito.when(invocation.getMethod()).thenReturn(method);
+    Mockito.when(invocation.getArguments())
+        .thenReturn(new Object[] {TEST_CATALOG + "/", TEST_SCHEMA, "tbl"});
+    Mockito.when(invocation.proceed()).thenReturn("PROCEEDED");
+
+    IcebergMetadataAuthorizationMethodInterceptor interceptor =
+        new IcebergMetadataAuthorizationMethodInterceptor() {
+          @Override
+          protected Optional<AuthorizationHandler> createAuthorizationHandler(
+              Parameter[] parameters, Object[] args) {
+            throw new RuntimeException("test");
+          }
+        };
+    Object result = interceptor.invoke(invocation);
+
+    assertNotEquals("PROCEEDED", result);
+  }
+
+  @Test
+  public void 
testInvokeSkipAuthorizationStillUsesCommonProceedExceptionMapping() throws 
Throwable {
+    IcebergCatalogWrapperManager wrapperManager = 
Mockito.mock(IcebergCatalogWrapperManager.class);
+    CatalogWrapperForREST wrapper = Mockito.mock(CatalogWrapperForREST.class);
+    RESTCatalog restCatalog = Mockito.mock(RESTCatalog.class);
+    
Mockito.when(wrapperManager.getCatalogWrapper(TEST_CATALOG)).thenReturn(wrapper);
+    Mockito.when(wrapper.getCatalog()).thenReturn(restCatalog);
+    Mockito.when(wrapper.isRESTCatalog()).thenReturn(true);
+    resetContext(wrapperManager, true);
+
+    Method method =
+        TestOperations.class.getMethod(
+            "testTableOperationWithAuthorizationExpression",
+            String.class,
+            String.class,
+            String.class);
+    MethodInvocation invocation = Mockito.mock(MethodInvocation.class);
+    Mockito.when(invocation.getMethod()).thenReturn(method);
+    Mockito.when(invocation.getArguments())
+        .thenReturn(new Object[] {TEST_CATALOG + "/", TEST_SCHEMA, "tbl"});
+    Mockito.when(invocation.proceed()).thenThrow(new 
RuntimeException("operation failed"));
+
+    IcebergMetadataAuthorizationMethodInterceptor interceptor =
+        new IcebergMetadataAuthorizationMethodInterceptor();
+    Object result = interceptor.invoke(invocation);
+
+    assertTrue(result instanceof Response);
+    Response response = (Response) result;
+    assertEquals(500, response.getStatus());
+    ErrorResponse errorResponse = (ErrorResponse) response.getEntity();
+    assertEquals("operation failed", errorResponse.message());
+  }
+
+  @Test
+  public void 
testInvokeShouldSkipAuthorizationWrapperLookupFailureReturnsAuthInternalError()
+      throws Throwable {
+    IcebergCatalogWrapperManager wrapperManager = 
Mockito.mock(IcebergCatalogWrapperManager.class);
+    Mockito.when(wrapperManager.getCatalogWrapper(TEST_CATALOG))
+        .thenThrow(new IllegalArgumentException("wrapper lookup failed"));
+    resetContext(wrapperManager, true);
+
+    Method method =
+        TestOperations.class.getMethod(
+            "testTableOperationWithAuthorizationExpression",
+            String.class,
+            String.class,
+            String.class);
+    MethodInvocation invocation = Mockito.mock(MethodInvocation.class);
+    Mockito.when(invocation.getMethod()).thenReturn(method);
+    Mockito.when(invocation.getArguments())
+        .thenReturn(new Object[] {TEST_CATALOG + "/", TEST_SCHEMA, "tbl"});
+    Mockito.when(invocation.proceed()).thenReturn("PROCEEDED");
+
+    IcebergMetadataAuthorizationMethodInterceptor interceptor =
+        new IcebergMetadataAuthorizationMethodInterceptor();
+    Object result = interceptor.invoke(invocation);
+
+    assertTrue(result instanceof Response);
+    Response response = (Response) result;
+    assertEquals(500, response.getStatus());
+    ErrorResponse errorResponse = (ErrorResponse) response.getEntity();
+    assertEquals("RuntimeException", errorResponse.type());
+    assertTrue(
+        errorResponse.message().contains("Authorization failed due to system 
internal error"));
+  }
+
   /** Test operations class to provide method annotations for testing. */
   @SuppressWarnings("unused")
   public static class TestOperations {
@@ -174,5 +394,13 @@ public class 
TestIcebergMetadataAuthorizationMethodInterceptor {
         @AuthorizationMetadata(type = Entity.EntityType.TABLE) String table) {
       // Test method
     }
+
+    @AuthorizationExpression(expression = "true")
+    public void testTableOperationWithAuthorizationExpression(
+        @AuthorizationMetadata(type = Entity.EntityType.CATALOG) String prefix,
+        @AuthorizationMetadata(type = Entity.EntityType.SCHEMA) String 
namespace,
+        @AuthorizationMetadata(type = Entity.EntityType.TABLE) String table) {
+      // Test method
+    }
   }
 }


Reply via email to