This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 732967d652 [#10570] improvement(iceberg): lazy initialize catalog
resources in wrapper (#10787)
732967d652 is described below
commit 732967d652066611891c0cd2797d6c2478a06288
Author: roryqi <[email protected]>
AuthorDate: Tue Apr 21 11:29:22 2026 +0800
[#10570] improvement(iceberg): lazy initialize catalog resources in wrapper
(#10787)
### What changes were proposed in this pull request?
Delay catalog, namespace adapter, and metadata cache initialization in
IcebergCatalogWrapper to avoid eager loading during construction, and
add tests that verify the lazy lifecycle and close behavior.
If remote REST catalog is temporary down, we can create the catalog
successfully, and execute requests when catalog starts successfully.
### Why are the changes needed?
This is the ##10570 followup
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
added UTs.
use a docker-compose to test it.
I have two Gravitino servers IRC1 and IRC2.
I will stop the Gravitino server IRC1, create a remote REST catalog
(this catalog can connect IRC1) in the IRC2 , start the Gravitino server
IRC1, and create the table.
---
.../iceberg/common/ops/IcebergCatalogWrapper.java | 156 ++++++++++++++-------
.../common/ops/TestIcebergCatalogWrapper.java | 143 +++++++++++++++++++
.../iceberg/service/CatalogWrapperForREST.java | 41 ++++--
.../service/IcebergCatalogWrapperManager.java | 2 +-
.../iceberg/service/TestCatalogWrapperForREST.java | 36 +++++
.../service/rest/CatalogWrapperForTest.java | 2 +-
6 files changed, 315 insertions(+), 65 deletions(-)
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
index 43295d0297..750efd92d5 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
@@ -71,14 +71,14 @@ public class IcebergCatalogWrapper implements AutoCloseable
{
public static final Logger LOG =
LoggerFactory.getLogger(IcebergCatalogWrapper.class);
- @Getter protected Catalog catalog;
- private SupportsNamespaces asNamespaceCatalog;
+ private final Object initializationLock = new Object();
+ private volatile Catalog catalog;
+ private volatile SupportsNamespaces asNamespaceCatalog;
private final IcebergCatalogBackend catalogBackend;
- @Getter private final IcebergConfig icebergConfig;
+ private final IcebergConfig icebergConfig;
private String catalogUri = null;
- private Map<String, String> catalogPropertiesMap;
- private TableMetadataCache metadataCache;
- private Configuration configuration;
+ private volatile TableMetadataCache metadataCache;
+ private final Configuration configuration;
public IcebergCatalogWrapper(IcebergConfig icebergConfig) {
this.icebergConfig = icebergConfig;
@@ -95,66 +95,111 @@ public class IcebergCatalogWrapper implements
AutoCloseable {
if (!IcebergCatalogBackend.MEMORY.equals(catalogBackend)) {
this.catalogUri = icebergConfig.get(IcebergConfig.CATALOG_URI);
}
- this.catalog = IcebergCatalogUtil.loadCatalogBackend(catalogBackend,
icebergConfig);
- if (catalog instanceof SupportsNamespaces) {
- this.asNamespaceCatalog = (SupportsNamespaces) catalog;
+ Map<String, String> catalogPropertiesMap =
icebergConfig.getIcebergCatalogProperties();
+ this.configuration = FileSystemUtils.createConfiguration(null,
catalogPropertiesMap);
+ }
+
+ public Catalog getCatalog() {
+ Catalog loadedCatalog = catalog;
+ if (loadedCatalog != null) {
+ return loadedCatalog;
}
- this.metadataCache = loadTableMetadataCache(icebergConfig, catalog);
- this.catalogPropertiesMap = icebergConfig.getIcebergCatalogProperties();
- this.configuration = FileSystemUtils.createConfiguration(null,
catalogPropertiesMap);
+ synchronized (initializationLock) {
+ if (catalog == null) {
+ catalog = IcebergCatalogUtil.loadCatalogBackend(catalogBackend,
icebergConfig);
+ }
+ return catalog;
+ }
+ }
+
+ public IcebergConfig getIcebergConfig() {
+ return icebergConfig;
+ }
+
+ private SupportsNamespaces getNamespaceCatalog() {
+ SupportsNamespaces namespaceCatalog = asNamespaceCatalog;
+ if (namespaceCatalog != null) {
+ return namespaceCatalog;
+ }
+
+ synchronized (initializationLock) {
+ if (asNamespaceCatalog == null) {
+ Catalog loadedCatalog = getCatalog();
+ if (loadedCatalog instanceof SupportsNamespaces) {
+ asNamespaceCatalog = (SupportsNamespaces) loadedCatalog;
+ }
+ }
+ return asNamespaceCatalog;
+ }
+ }
+
+ private TableMetadataCache getMetadataCache() {
+ TableMetadataCache cache = metadataCache;
+ if (cache != null) {
+ return cache;
+ }
+
+ synchronized (initializationLock) {
+ if (metadataCache == null) {
+ metadataCache = loadTableMetadataCache(icebergConfig, getCatalog());
+ }
+ return metadataCache;
+ }
}
private void validateNamespace(Optional<Namespace> namespace) {
namespace.ifPresent(
n -> Preconditions.checkArgument(!n.toString().isEmpty(), "Namespace
couldn't be empty"));
- if (asNamespaceCatalog == null) {
+ if (getNamespaceCatalog() == null) {
throw new UnsupportedOperationException(
"The underlying catalog doesn't support namespace operation");
}
}
private ViewCatalog getViewCatalog() {
- if (!(catalog instanceof ViewCatalog)) {
- throw new UnsupportedOperationException(catalog.name() + " is not
support view");
+ Catalog loadedCatalog = getCatalog();
+ if (!(loadedCatalog instanceof ViewCatalog)) {
+ throw new UnsupportedOperationException(
+ "The underlying catalog '" + loadedCatalog.name() + "' does not
support view operations");
}
- return (ViewCatalog) catalog;
+ return (ViewCatalog) loadedCatalog;
}
public CreateNamespaceResponse createNamespace(CreateNamespaceRequest
request) {
validateNamespace(Optional.of(request.namespace()));
- return CatalogHandlers.createNamespace(asNamespaceCatalog, request);
+ return CatalogHandlers.createNamespace(getNamespaceCatalog(), request);
}
public void dropNamespace(Namespace namespace) {
validateNamespace(Optional.of(namespace));
- CatalogHandlers.dropNamespace(asNamespaceCatalog, namespace);
+ CatalogHandlers.dropNamespace(getNamespaceCatalog(), namespace);
}
public GetNamespaceResponse loadNamespace(Namespace namespace) {
validateNamespace(Optional.of(namespace));
- return CatalogHandlers.loadNamespace(asNamespaceCatalog, namespace);
+ return CatalogHandlers.loadNamespace(getNamespaceCatalog(), namespace);
}
public boolean namespaceExists(Namespace namespace) {
validateNamespace(Optional.of(namespace));
- return asNamespaceCatalog.namespaceExists(namespace);
+ return getNamespaceCatalog().namespaceExists(namespace);
}
public ListNamespacesResponse listNamespace(Namespace parent) {
validateNamespace(Optional.empty());
- return CatalogHandlers.listNamespaces(asNamespaceCatalog, parent);
+ return CatalogHandlers.listNamespaces(getNamespaceCatalog(), parent);
}
public UpdateNamespacePropertiesResponse updateNamespaceProperties(
Namespace namespace, UpdateNamespacePropertiesRequest
updateNamespacePropertiesRequest) {
validateNamespace(Optional.of(namespace));
return CatalogHandlers.updateNamespaceProperties(
- asNamespaceCatalog, namespace, updateNamespacePropertiesRequest);
+ getNamespaceCatalog(), namespace, updateNamespacePropertiesRequest);
}
public LoadTableResponse registerTable(Namespace namespace,
RegisterTableRequest request) {
- return CatalogHandlers.registerTable(catalog, namespace, request);
+ return CatalogHandlers.registerTable(getCatalog(), namespace, request);
}
/**
@@ -170,36 +215,39 @@ public class IcebergCatalogWrapper implements
AutoCloseable {
public LoadTableResponse createTable(Namespace namespace, CreateTableRequest
request) {
request.validate();
+ Catalog loadedCatalog = getCatalog();
if (request.stageCreate()) {
- return CatalogHandlers.stageTableCreate(catalog, namespace, request);
+ return CatalogHandlers.stageTableCreate(loadedCatalog, namespace,
request);
}
- LoadTableResponse loadTableResponse = CatalogHandlers.createTable(catalog,
namespace, request);
+ LoadTableResponse loadTableResponse =
+ CatalogHandlers.createTable(loadedCatalog, namespace, request);
if (loadTableResponse != null) {
TableIdentifier tableIdentifier = TableIdentifier.of(namespace,
request.name());
- metadataCache.updateTableMetadata(tableIdentifier,
loadTableResponse.tableMetadata());
+ getMetadataCache().updateTableMetadata(tableIdentifier,
loadTableResponse.tableMetadata());
}
return loadTableResponse;
}
public void dropTable(TableIdentifier tableIdentifier) {
- metadataCache.invalidate(tableIdentifier);
- CatalogHandlers.dropTable(catalog, tableIdentifier);
+ getMetadataCache().invalidate(tableIdentifier);
+ CatalogHandlers.dropTable(getCatalog(), tableIdentifier);
}
public void purgeTable(TableIdentifier tableIdentifier) {
- metadataCache.invalidate(tableIdentifier);
- CatalogHandlers.purgeTable(catalog, tableIdentifier);
+ getMetadataCache().invalidate(tableIdentifier);
+ CatalogHandlers.purgeTable(getCatalog(), tableIdentifier);
}
public LoadTableResponse loadTable(TableIdentifier tableIdentifier) {
- Optional<TableMetadata> tableMetadataOptional =
metadataCache.getTableMetadata(tableIdentifier);
+ Optional<TableMetadata> tableMetadataOptional =
+ getMetadataCache().getTableMetadata(tableIdentifier);
if (tableMetadataOptional.isPresent()) {
return
LoadTableResponse.builder().withTableMetadata(tableMetadataOptional.get()).build();
}
- LoadTableResponse loadTableResponse = CatalogHandlers.loadTable(catalog,
tableIdentifier);
+ LoadTableResponse loadTableResponse =
CatalogHandlers.loadTable(getCatalog(), tableIdentifier);
if (loadTableResponse != null) {
- metadataCache.updateTableMetadata(tableIdentifier,
loadTableResponse.tableMetadata());
+ getMetadataCache().updateTableMetadata(tableIdentifier,
loadTableResponse.tableMetadata());
}
return loadTableResponse;
}
@@ -214,33 +262,34 @@ public class IcebergCatalogWrapper implements
AutoCloseable {
* support this operation
*/
public Optional<String> getTableMetadataLocation(TableIdentifier
tableIdentifier) {
- if (catalog instanceof SupportsMetadataLocation) {
+ Catalog loadedCatalog = getCatalog();
+ if (loadedCatalog instanceof SupportsMetadataLocation) {
return Optional.ofNullable(
- ((SupportsMetadataLocation)
catalog).metadataLocation(tableIdentifier));
+ ((SupportsMetadataLocation)
loadedCatalog).metadataLocation(tableIdentifier));
}
return Optional.empty();
}
public boolean tableExists(TableIdentifier tableIdentifier) {
- return catalog.tableExists(tableIdentifier);
+ return getCatalog().tableExists(tableIdentifier);
}
public ListTablesResponse listTable(Namespace namespace) {
- return CatalogHandlers.listTables(catalog, namespace);
+ return CatalogHandlers.listTables(getCatalog(), namespace);
}
public void renameTable(RenameTableRequest renameTableRequest) {
- metadataCache.invalidate(renameTableRequest.source());
- CatalogHandlers.renameTable(catalog, renameTableRequest);
+ getMetadataCache().invalidate(renameTableRequest.source());
+ CatalogHandlers.renameTable(getCatalog(), renameTableRequest);
}
public LoadTableResponse updateTable(
TableIdentifier tableIdentifier, UpdateTableRequest updateTableRequest) {
- metadataCache.invalidate(tableIdentifier);
+ getMetadataCache().invalidate(tableIdentifier);
LoadTableResponse loadTableResponse =
- CatalogHandlers.updateTable(catalog, tableIdentifier,
updateTableRequest);
+ CatalogHandlers.updateTable(getCatalog(), tableIdentifier,
updateTableRequest);
if (loadTableResponse != null) {
- metadataCache.updateTableMetadata(tableIdentifier,
loadTableResponse.tableMetadata());
+ getMetadataCache().updateTableMetadata(tableIdentifier,
loadTableResponse.tableMetadata());
}
return loadTableResponse;
}
@@ -283,14 +332,15 @@ public class IcebergCatalogWrapper implements
AutoCloseable {
}
public boolean supportsViewOperations() {
- if (!(catalog instanceof ViewCatalog)) {
+ Catalog loadedCatalog = getCatalog();
+ if (!(loadedCatalog instanceof ViewCatalog)) {
return false;
}
// JDBC catalog only supports view operations from v1 schema version
- if (catalog instanceof JdbcCatalogWithMetadataLocationSupport) {
+ if (loadedCatalog instanceof JdbcCatalogWithMetadataLocationSupport) {
JdbcCatalogWithMetadataLocationSupport jdbcCatalog =
- (JdbcCatalogWithMetadataLocationSupport) catalog;
+ (JdbcCatalogWithMetadataLocationSupport) loadedCatalog;
return jdbcCatalog.supportsViewsWithSchemaVersion();
}
@@ -299,13 +349,21 @@ public class IcebergCatalogWrapper implements
AutoCloseable {
@Override
public void close() throws Exception {
- LOG.info("Closing IcebergCatalogWrapper for catalog: {}", catalog.name());
- if (catalog instanceof AutoCloseable) {
+ Catalog loadedCatalog = catalog;
+ if (loadedCatalog != null) {
+ LOG.info("Closing IcebergCatalogWrapper for catalog: {}",
loadedCatalog.name());
+ } else {
+ LOG.info("Closing IcebergCatalogWrapper before catalog is initialized");
+ }
+ if (loadedCatalog instanceof AutoCloseable) {
// JdbcCatalog and ClosableHiveCatalog implement AutoCloseable and will
handle their own
// cleanup
- ((AutoCloseable) catalog).close();
+ ((AutoCloseable) loadedCatalog).close();
+ }
+ TableMetadataCache cache = metadataCache;
+ if (cache != null) {
+ cache.close();
}
- metadataCache.close();
// For Iceberg REST server which use the same classloader when recreating
catalog wrapper, the
// Driver couldn't be reloaded after deregister()
diff --git
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java
new file mode 100644
index 0000000000..bd19c61425
--- /dev/null
+++
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergCatalogWrapper.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.ops;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.common.cache.SupportsMetadataLocation;
+import org.apache.gravitino.iceberg.common.cache.TableMetadataCache;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestIcebergCatalogWrapper {
+
+ @Test
+ public void testCatalogShouldBeLazyLoaded() {
+ IcebergCatalogWrapper wrapper =
+ new IcebergCatalogWrapper(new IcebergConfig(unreachableConfig()));
+
+ Assertions.assertThrows(Throwable.class, wrapper::getCatalog);
+ }
+
+ @Test
+ public void testCloseShouldNotInitializeCatalog() {
+ IcebergCatalogWrapper wrapper =
+ new IcebergCatalogWrapper(new IcebergConfig(unreachableConfig()));
+
+ Assertions.assertDoesNotThrow(
+ () -> {
+ wrapper.close();
+ });
+ }
+
+ @Test
+ public void testMetadataCacheShouldInitializeOnFirstAccessAndClose(@TempDir
Path warehouseDir)
+ throws Exception {
+ TrackingTableMetadataCache.reset();
+ IcebergCatalogWrapper wrapper =
+ new IcebergCatalogWrapper(new
IcebergConfig(metadataConfig(warehouseDir)));
+
+ Assertions.assertEquals(0,
TrackingTableMetadataCache.INITIALIZE_COUNT.get());
+
+ TableMetadataCache cache = invokeGetMetadataCache(wrapper);
+ Assertions.assertNotNull(cache);
+ Assertions.assertEquals(1,
TrackingTableMetadataCache.INITIALIZE_COUNT.get());
+ Assertions.assertFalse(TrackingTableMetadataCache.CLOSED.get());
+
+ wrapper.close();
+
+ Assertions.assertEquals(1,
TrackingTableMetadataCache.INITIALIZE_COUNT.get());
+ Assertions.assertTrue(TrackingTableMetadataCache.CLOSED.get());
+ }
+
+ private static TableMetadataCache
invokeGetMetadataCache(IcebergCatalogWrapper wrapper)
+ throws Exception {
+ Method method =
IcebergCatalogWrapper.class.getDeclaredMethod("getMetadataCache");
+ method.setAccessible(true);
+ return (TableMetadataCache) method.invoke(wrapper);
+ }
+
+ private static Map<String, String> unreachableConfig() {
+ Map<String, String> config = new HashMap<>();
+ config.put(IcebergConstants.CATALOG_BACKEND, "jdbc");
+ config.put(IcebergConstants.URI, "jdbc:invalid://unreachable");
+ config.put(IcebergConstants.WAREHOUSE, "unused");
+ return config;
+ }
+
+ private static Map<String, String> metadataConfig(Path warehouseDir) {
+ Map<String, String> config = new HashMap<>();
+ config.put(IcebergConstants.CATALOG_BACKEND, "jdbc");
+ config.put(IcebergConstants.URI, "jdbc:sqlite::memory:");
+ config.put(IcebergConstants.WAREHOUSE, warehouseDir.toString());
+ config.put(IcebergConstants.GRAVITINO_JDBC_DRIVER, "org.sqlite.JDBC");
+ config.put(IcebergConstants.ICEBERG_JDBC_USER, "test");
+ config.put(IcebergConstants.ICEBERG_JDBC_PASSWORD, "test");
+ config.put(IcebergConstants.ICEBERG_JDBC_INITIALIZE, "false");
+ config.put(
+ IcebergConstants.TABLE_METADATA_CACHE_IMPL,
TrackingTableMetadataCache.class.getName());
+ return config;
+ }
+
+ public static class TrackingTableMetadataCache implements TableMetadataCache
{
+ private static final AtomicInteger INITIALIZE_COUNT = new AtomicInteger();
+ private static final AtomicBoolean CLOSED = new AtomicBoolean();
+
+ static void reset() {
+ INITIALIZE_COUNT.set(0);
+ CLOSED.set(false);
+ }
+
+ @Override
+ public void initialize(
+ int capacity,
+ int expireMinutes,
+ Map<String, String> catalogProperties,
+ SupportsMetadataLocation supportsMetadataLocation) {
+ INITIALIZE_COUNT.incrementAndGet();
+ }
+
+ @Override
+ public void invalidate(TableIdentifier tableIdentifier) {}
+
+ @Override
+ public Optional<TableMetadata> getTableMetadata(TableIdentifier
tableIdentifier) {
+ return Optional.empty();
+ }
+
+ @Override
+ public void updateTableMetadata(TableIdentifier tableIdentifier,
TableMetadata tableMetadata) {}
+
+ @Override
+ public void close() throws IOException {
+ CLOSED.set(true);
+ }
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
index 35a5acc994..dc1f6858c0 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
@@ -91,7 +91,8 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
private final CatalogCredentialManager catalogCredentialManager;
- private final Map<String, String> catalogConfigToClients;
+ private volatile Map<String, String> catalogConfigToClients;
+ private final Object catalogConfigToClientsLock = new Object();
private final ScanPlanCache scanPlanCache;
@@ -117,7 +118,6 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
public CatalogWrapperForREST(String catalogName, IcebergConfig config) {
super(config);
- this.catalogConfigToClients = buildCatalogConfigToClients(config,
getCatalog());
// To be compatible with old properties
Map<String, String> catalogProperties =
checkForCompatibility(config.getAllConfig(), deprecatedProperties);
@@ -128,7 +128,7 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
public LoadTableResponse createTable(
Namespace namespace, CreateTableRequest request, boolean
requestCredential) {
LoadTableResponse loadTableResponse;
- if (catalog instanceof RESTCatalog) {
+ if (getCatalog() instanceof RESTCatalog) {
loadTableResponse = createTableInternal(namespace, request);
} else {
loadTableResponse = super.createTable(namespace, request);
@@ -145,7 +145,7 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
public LoadTableResponse loadTable(
TableIdentifier identifier, boolean requestCredential,
CredentialPrivilege privilege) {
LoadTableResponse loadTableResponse;
- if (catalog instanceof RESTCatalog) {
+ if (getCatalog() instanceof RESTCatalog) {
loadTableResponse = loadTableInternal(identifier);
} else {
loadTableResponse = super.loadTable(identifier);
@@ -159,8 +159,9 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
@Override
public LoadTableResponse updateTable(
TableIdentifier tableIdentifier, UpdateTableRequest updateTableRequest) {
- if (catalog instanceof RESTCatalog) {
- return CatalogHandlers.updateTable(catalog, tableIdentifier,
updateTableRequest);
+ Catalog loadedCatalog = getCatalog();
+ if (loadedCatalog instanceof RESTCatalog) {
+ return CatalogHandlers.updateTable(loadedCatalog, tableIdentifier,
updateTableRequest);
} else {
return super.updateTable(tableIdentifier, updateTableRequest);
}
@@ -220,7 +221,17 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
}
public Map<String, String> getCatalogConfigToClient() {
- return catalogConfigToClients;
+ Map<String, String> configToClients = catalogConfigToClients;
+ if (configToClients != null) {
+ return configToClients;
+ }
+
+ synchronized (catalogConfigToClientsLock) {
+ if (catalogConfigToClients == null) {
+ catalogConfigToClients =
buildCatalogConfigToClients(getIcebergConfig(), getCatalog());
+ }
+ return catalogConfigToClients;
+ }
}
/**
@@ -406,7 +417,7 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
scanRequest.caseSensitive());
try {
- Table table = catalog.loadTable(tableIdentifier);
+ Table table = getCatalog().loadTable(tableIdentifier);
Optional<PlanTableScanResponse> cachedResponse =
scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, table,
scanRequest));
if (cachedResponse.isPresent()) {
@@ -594,8 +605,8 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
int expireMinutes =
config.get(IcebergConfig.SCAN_PLAN_CACHE_EXPIRE_MINUTES);
cache.initialize(capacity, expireMinutes);
LOG.info(
- "Load scan plan cache for catalog: {}, impl: {}, capacity: {}, expire
minutes: {}",
- catalog.name(),
+ "Load scan plan cache, backend: {}, impl: {}, capacity: {}, expire
minutes: {}",
+ config.get(IcebergConfig.CATALOG_BACKEND),
impl,
capacity,
expireMinutes);
@@ -630,6 +641,7 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
}
private LoadTableResponse createTableInternal(Namespace namespace,
CreateTableRequest request) {
+ Catalog loadedCatalog = getCatalog();
request.validate();
@@ -639,7 +651,7 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
TableIdentifier ident = TableIdentifier.of(namespace, request.name());
Table table =
- catalog
+ loadedCatalog
.buildTable(ident, request.schema())
.withLocation(request.location())
.withPartitionSpec(request.spec())
@@ -664,8 +676,9 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
private LoadTableResponse stageTableCreateInternal(
Namespace namespace, CreateTableRequest request) {
+ Catalog loadedCatalog = getCatalog();
TableIdentifier ident = TableIdentifier.of(namespace, request.name());
- if (catalog.tableExists(ident)) {
+ if (loadedCatalog.tableExists(ident)) {
throw new AlreadyExistsException("Table already exists: %s", ident);
}
@@ -679,7 +692,7 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
location = request.location();
} else {
Table table =
- catalog
+ loadedCatalog
.buildTable(ident, request.schema())
.withPartitionSpec(request.spec())
.withSortOrder(request.writeOrder())
@@ -706,7 +719,7 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
}
private LoadTableResponse loadTableInternal(TableIdentifier ident) {
- Table table = catalog.loadTable(ident);
+ Table table = getCatalog().loadTable(ident);
if (table instanceof BaseTable) {
Map<String, String> properties = retrieveFileIOProperties(table.io());
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
index f0b8b400c5..8d8edcf185 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
@@ -130,7 +130,7 @@ public class IcebergCatalogWrapperManager implements
AutoCloseable {
CatalogWrapperForREST rest = new CatalogWrapperForREST(catalogName,
icebergConfig);
AuthenticationConfig authenticationConfig =
new AuthenticationConfig(icebergConfig.getAllConfig());
- if (rest.getCatalog() instanceof SupportsKerberos &&
authenticationConfig.isKerberosAuth()) {
+ if (authenticationConfig.isKerberosAuth() && rest.getCatalog() instanceof
SupportsKerberos) {
return (CatalogWrapperForREST)
new KerberosAwareIcebergCatalogProxy(rest).getProxy(catalogName,
icebergConfig);
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestCatalogWrapperForREST.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestCatalogWrapperForREST.java
index 17b69c5b2b..73c69f427a 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestCatalogWrapperForREST.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestCatalogWrapperForREST.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.iceberg.catalog.Catalog;
@@ -33,6 +34,8 @@ import org.junit.jupiter.api.Test;
public class TestCatalogWrapperForREST {
+ private static final AtomicBoolean CONSTRUCTION_IN_PROGRESS = new
AtomicBoolean(false);
+
@Test
void testCheckPropertiesForCompatibility() {
ImmutableMap<String, String> deprecatedMap = ImmutableMap.of("deprecated",
"new");
@@ -153,4 +156,37 @@ public class TestCatalogWrapperForREST {
IllegalArgumentException.class,
() -> CatalogWrapperForREST.buildCatalogConfigToClients(config,
catalog));
}
+
+ @Test
+ void testConstructorDoesNotLoadCatalogEagerly() {
+ IcebergConfig config =
+ new IcebergConfig(
+ ImmutableMap.of(
+ IcebergConstants.CATALOG_BACKEND,
+ "memory",
+ IcebergConstants.WAREHOUSE,
+ "/tmp/warehouse"));
+
+ CONSTRUCTION_IN_PROGRESS.set(true);
+ try {
+ Assertions.assertDoesNotThrow(() -> new
LazyCheckCatalogWrapperForREST("test", config));
+ } finally {
+ CONSTRUCTION_IN_PROGRESS.set(false);
+ }
+ }
+
+ private static class LazyCheckCatalogWrapperForREST extends
CatalogWrapperForREST {
+
+ LazyCheckCatalogWrapperForREST(String catalogName, IcebergConfig config) {
+ super(catalogName, config);
+ }
+
+ @Override
+ public Catalog getCatalog() {
+ if (CONSTRUCTION_IN_PROGRESS.get()) {
+ throw new AssertionError("Catalog should not be loaded during wrapper
construction");
+ }
+ return super.getCatalog();
+ }
+ }
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
index 94c29e5de1..d5eccaf853 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
@@ -91,7 +91,7 @@ public class CatalogWrapperForTest extends
CatalogWrapperForREST {
private void appendSampleData(Namespace namespace, String tableName) {
try {
- Table table = catalog.loadTable(TableIdentifier.of(namespace,
tableName));
+ Table table = getCatalog().loadTable(TableIdentifier.of(namespace,
tableName));
// Append multiple times to create multiple snapshots for incremental
scan testing
for (int i = 0; i < 3; i++) {
Path tempFile = Files.createTempFile("plan-scan-" + i, ".parquet");