This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 98c1e93a26 [#10555] improve(flink): Support enable session catalogs
using gravitino connector (#10560)
98c1e93a26 is described below
commit 98c1e93a26fa059599526811a145816d2f2b0c9e
Author: MaSai <[email protected]>
AuthorDate: Tue Mar 31 22:08:09 2026 +0800
[#10555] improve(flink): Support enable session catalogs using gravitino
connector (#10560)
### What changes were proposed in this pull request?
This PR introduces a mechanism to allow non-gravitino connectors (such
as standard Flink connectors like `jdbc`,
`hive`) to be used alongside Gravitino-managed catalogs in the Flink
connector.
Details are as follows:
CREATE CATALOG: Gravitino-managed catalog are persisted to the Gravitino
server; non-Gravitino catalog are stored in the in-memory store only.
GET / USE CATALOG: The in-memory store is checked first. If the catalog
is not found there, it is retrieved from the Gravitino server.
DROP CATALOG: The in-memory store is checked first. If the catalog
exists there it is removed from memory; otherwise it is removed from the
Gravitino server.
SHOW / LIST CATALOGS: Returns the combined set of catalogs from both the
in-memory store and the Gravitino server.
Session scope: Catalogs stored only in memory are session-scoped and
will not survive when Flink restart.
Name conflict: If a catalog with the same name exists in both stores,
the in-memory entry takes precedence.
### Why are the changes needed?
Previously, the Gravitino Flink connector assumed all catalogs were
Gravitino-managed. This prevented users from
using standard Flink connectors (e.g., for temporary catalogs not yet
supported or governed by Gravitino)
within the same environment. This change provides flexibility to mix
governed and ungoverned catalogs.
Fix: #10555
### Does this PR introduce any user-facing change?
Yes. User can configure
`table.catalog-store.gravitino.gravitino.supportSessionCatalog` in
`flink-conf.yaml` or via `TableEnvironment` to implement this function.
### How was this patch tested?
TestGravitinoSessionCatalogStore and TestGravitinoCatalogStoreFactory
---------
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
docs/flink-connector/flink-connector.md | 14 +
.../store/GravitinoCatalogStoreFactory.java | 23 +-
.../store/GravitinoCatalogStoreFactoryOptions.java | 6 +
.../store/GravitinoSessionCatalogStore.java | 131 ++++++++
.../flink/connector/utils/FactoryUtils.java | 18 ++
.../session/FlinkSupportsSessionCatalogIT.java | 359 +++++++++++++++++++++
.../store/TestGravitinoCatalogStoreFactory.java | 205 ++++++++++++
.../store/TestGravitinoSessionCatalogStore.java | 216 +++++++++++++
8 files changed, 970 insertions(+), 2 deletions(-)
diff --git a/docs/flink-connector/flink-connector.md
b/docs/flink-connector/flink-connector.md
index 1d0c31664e..dcb84a2c21 100644
--- a/docs/flink-connector/flink-connector.md
+++ b/docs/flink-connector/flink-connector.md
@@ -35,8 +35,20 @@ This capability allows users to perform federation queries,
accessing data from
| table.catalog-store.kind | string |
generic_in_memory | The Catalog Store name, it should set to `gravitino`.
| Yes | 0.6.0-incubating |
| table.catalog-store.gravitino.gravitino.metalake | string | (none)
| The metalake name that flink connector used to request to Gravitino. | Yes
| 0.6.0-incubating |
| table.catalog-store.gravitino.gravitino.uri | string | (none)
| The uri of Gravitino server address. | Yes
| 0.6.0-incubating |
+| table.catalog-store.gravitino.gravitino.enableSessionCatalogSupport |
boolean | false | Whether to enable support for Flink's session catalog in the
Gravitino catalog store. | No | 1.3.0 |
| table.catalog-store.gravitino.gravitino.client. | string | (none)
| The configuration key prefix for the Gravitino client config. | No
| 1.0.0 |
+When `table.catalog-store.gravitino.gravitino.enableSessionCatalogSupport` is
set to `true`, Gravitino uses `GravitinoSessionCatalogStore`, which combines a
`GravitinoCatalogStore` (backed by the Gravitino server) with an in-memory
store to support Flink's session catalog. When `false` (the default), only
`GravitinoCatalogStore` is used.
+
+When session catalog support is enabled, the following behaviors apply:
+
+- **CREATE CATALOG**: Gravitino-managed catalogs (e.g. `gravitino-hive`,
`gravitino-iceberg`) are persisted to the Gravitino server;
non-Gravitino-managed catalogs (e.g. `hive`, `jdbc`, or any custom connector
type) are stored in the in-memory store only.
+- **GET / USE CATALOG**: The in-memory store is checked first. If the catalog
is not found there, it is retrieved from the Gravitino server.
+- **DROP CATALOG**: The in-memory store is checked first. If the catalog
exists there it is removed from memory; otherwise it is removed from the
Gravitino server.
+- **SHOW / LIST CATALOGS**: Returns the combined set of catalogs from both the
in-memory store and the Gravitino server.
+- **Session scope**: Catalogs stored only in memory are session-scoped and
will not survive when Flink restarts.
+- **Name conflict**: If a catalog with the same name exists in both stores,
the in-memory entry takes precedence.
+
To configure the Gravitino client, use properties prefixed with
`table.catalog-store.gravitino.gravitino.client.`. These properties will be
passed to the Gravitino client after removing the
`table.catalog-store.gravitino.` prefix.
**Example:** Setting
`table.catalog-store.gravitino.gravitino.client.socketTimeoutMs` is equivalent
to setting `gravitino.client.socketTimeoutMs` for the Gravitino client.
@@ -48,6 +60,7 @@ Set the flink configuration in flink-conf.yaml.
table.catalog-store.kind: gravitino
table.catalog-store.gravitino.gravitino.metalake: metalake_demo
table.catalog-store.gravitino.gravitino.uri: http://localhost:8090
+table.catalog-store.gravitino.gravitino.enableSessionCatalogSupport: true
table.catalog-store.gravitino.gravitino.client.socketTimeoutMs: 60000
table.catalog-store.gravitino.gravitino.client.connectionTimeoutMs: 60000
```
@@ -57,6 +70,7 @@ final Configuration configuration = new Configuration();
configuration.setString("table.catalog-store.kind", "gravitino");
configuration.setString("table.catalog-store.gravitino.gravitino.metalake",
"metalake_demo");
configuration.setString("table.catalog-store.gravitino.gravitino.uri",
"http://localhost:8090");
+configuration.setBoolean("table.catalog-store.gravitino.gravitino.enableSessionCatalogSupport",
true);
configuration.setString("table.catalog-store.gravitino.gravitino.client.socketTimeoutMs",
"60000");
configuration.setString("table.catalog-store.gravitino.gravitino.client.connectionTimeoutMs",
"60000");
EnvironmentSettings.Builder builder =
EnvironmentSettings.newInstance().withConfiguration(configuration);
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java
index b59b9cfc94..1cc1f0ab9a 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.flink.connector.store;
import static
org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper;
import static
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO;
import static
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_CLIENT_CONFIG;
+import static
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_ENABLE_SESSION_CATALOG_SUPPORT;
import static
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_METALAKE;
import static
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_URI;
@@ -34,6 +35,7 @@ import java.util.stream.Collectors;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
@@ -43,10 +45,14 @@ import
org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
/** The Factory for creating {@link GravitinoCatalogStore}. */
public class GravitinoCatalogStoreFactory implements CatalogStoreFactory {
private GravitinoCatalogManager catalogManager;
+ private boolean enableSessionCatalogSupport;
+ private GravitinoCatalogStore gravitinoCatalogStore;
+ private GenericInMemoryCatalogStore memoryCatalogStore;
+ private CatalogStore catalogStore;
@Override
public CatalogStore createCatalogStore() {
- return new GravitinoCatalogStore(catalogManager);
+ return catalogStore;
}
@Override
@@ -67,6 +73,16 @@ public class GravitinoCatalogStoreFactory implements
CatalogStoreFactory {
this.catalogManager =
GravitinoCatalogManager.create(gravitinoUri, gravitinoName,
extractClientConfig(options));
+ this.enableSessionCatalogSupport =
options.get(GRAVITINO_ENABLE_SESSION_CATALOG_SUPPORT);
+ this.gravitinoCatalogStore = new GravitinoCatalogStore(catalogManager);
+ if (enableSessionCatalogSupport) {
+ this.memoryCatalogStore = new GenericInMemoryCatalogStore();
+ this.memoryCatalogStore.open();
+ this.catalogStore =
+ new GravitinoSessionCatalogStore(gravitinoCatalogStore,
memoryCatalogStore);
+ } else {
+ this.catalogStore = gravitinoCatalogStore;
+ }
}
@Override
@@ -74,6 +90,9 @@ public class GravitinoCatalogStoreFactory implements
CatalogStoreFactory {
if (catalogManager != null) {
catalogManager.close();
}
+ if (memoryCatalogStore != null) {
+ memoryCatalogStore.close();
+ }
}
@Override
@@ -88,7 +107,7 @@ public class GravitinoCatalogStoreFactory implements
CatalogStoreFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
- return ImmutableSet.of(GRAVITINO_CLIENT_CONFIG);
+ return ImmutableSet.of(GRAVITINO_CLIENT_CONFIG,
GRAVITINO_ENABLE_SESSION_CATALOG_SUPPORT);
}
@VisibleForTesting
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
index cda4297881..f2ac8b52ff 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
@@ -40,6 +40,12 @@ public class GravitinoCatalogStoreFactoryOptions {
.stringType()
.noDefaultValue()
.withDescription("The name of Gravitino metalake");
+ public static final ConfigOption<Boolean>
GRAVITINO_ENABLE_SESSION_CATALOG_SUPPORT =
+ ConfigOptions.key("gravitino.enableSessionCatalogSupport")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable support for Flink's session catalog in the
Gravitino catalog store");
public static final ConfigOption<Map<String, String>>
GRAVITINO_CLIENT_CONFIG =
ConfigOptions.key("gravitino.client")
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoSessionCatalogStore.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoSessionCatalogStore.java
new file mode 100644
index 0000000000..ffb0045247
--- /dev/null
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoSessionCatalogStore.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.store;
+
+import static
org.apache.gravitino.flink.connector.utils.FactoryUtils.isGravitinoManagedCatalogType;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.flink.table.catalog.AbstractCatalogStore;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A catalog store that combines a session-scoped in-memory {@link
GenericInMemoryCatalogStore} with
+ * a persistent {@link GravitinoCatalogStore}.
+ *
+ * <p>Routing is based on the catalog type declared in the {@link
CatalogDescriptor}:
+ *
+ * <ul>
+ * <li><b>Gravitino-managed catalog types</b> (e.g. {@code gravitino-hive},
{@code
+ * gravitino-iceberg}) are persisted to the Gravitino-backed store and
are visible across all
+ * Flink sessions and Gravitino clients that share the same metalake.
+ * <li><b>All other catalog types</b> (e.g. {@code generic_in_memory},
third-party connectors) are
+ * stored in the session-scoped in-memory store only and are not
persisted to Gravitino.
+ * </ul>
+ *
+ * <p>When retrieving, listing, or removing catalogs, entries in the in-memory
store take precedence
+ * over entries in the Gravitino-backed store.
+ *
+ * <p>This store is intended to be used per Flink session, keeping transient
catalogs in memory
+ * while delegating persistent catalogs to Apache Gravitino.
+ */
+public class GravitinoSessionCatalogStore extends AbstractCatalogStore {
+ private final GenericInMemoryCatalogStore memoryCatalogStore;
+ private final GravitinoCatalogStore gravitinoCatalogStore;
+
+ public GravitinoSessionCatalogStore(
+ GravitinoCatalogStore gravitinoCatalogStore, GenericInMemoryCatalogStore
memoryCatalogStore) {
+ Preconditions.checkArgument(gravitinoCatalogStore != null, "CatalogStore
cannot be null");
+ Preconditions.checkArgument(memoryCatalogStore != null,
"MemoryCatalogStore cannot be null");
+ this.gravitinoCatalogStore = gravitinoCatalogStore;
+ this.memoryCatalogStore = memoryCatalogStore;
+ }
+
+ @Override
+ public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
+ throws CatalogException {
+ String catalogType =
descriptor.getConfiguration().get(CommonCatalogOptions.CATALOG_TYPE);
+ if (catalogType == null) {
+ throw new CatalogException(
+ String.format(
+ "Cannot store catalog '%s': '%s' is not set in the catalog
descriptor. "
+ + "Please specify a catalog type.",
+ catalogName, CommonCatalogOptions.CATALOG_TYPE.key()));
+ }
+ if (isGravitinoManagedCatalogType(catalogType)) {
+ gravitinoCatalogStore.storeCatalog(catalogName, descriptor);
+ } else {
+ memoryCatalogStore.storeCatalog(catalogName, descriptor);
+ }
+ }
+
+ /**
+ * Removes the specified catalog.
+ *
+ * @param catalogName name of the catalog to remove
+ * @param ignoreIfNotExists if true, ignore when the catalog does not exist
+ * @throws CatalogException if the catalog cannot be removed
+ */
+ @Override
+ public void removeCatalog(String catalogName, boolean ignoreIfNotExists)
throws CatalogException {
+ if (memoryCatalogStore.contains(catalogName)) {
+ memoryCatalogStore.removeCatalog(catalogName, ignoreIfNotExists);
+ } else {
+ gravitinoCatalogStore.removeCatalog(catalogName, ignoreIfNotExists);
+ }
+ }
+
+ /**
+ * Get a catalog by name.
+ *
+ * @param catalogName name of the catalog to retrieve
+ * @return the requested catalog or empty if the catalog does not exist
+ * @throws CatalogException throw a CatalogException when the Catalog cannot
be created.
+ */
+ @Override
+ public Optional<CatalogDescriptor> getCatalog(String catalogName) throws
CatalogException {
+ Optional<CatalogDescriptor> descriptor =
memoryCatalogStore.getCatalog(catalogName);
+ return descriptor.isPresent() ? descriptor :
gravitinoCatalogStore.getCatalog(catalogName);
+ }
+
+ @Override
+ public Set<String> listCatalogs() throws CatalogException {
+ Set<String> catalogs = new HashSet<>();
+ catalogs.addAll(memoryCatalogStore.listCatalogs());
+ try {
+ catalogs.addAll(gravitinoCatalogStore.listCatalogs());
+ } catch (CatalogException e) {
+ throw new CatalogException("Failed to list catalog.", e);
+ } catch (RuntimeException e) {
+ throw new CatalogException("Failed to list catalog.", e);
+ }
+ return catalogs;
+ }
+
+ @Override
+ public boolean contains(String catalogName) throws CatalogException {
+ return memoryCatalogStore.contains(catalogName) ||
gravitinoCatalogStore.contains(catalogName);
+ }
+}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java
index efb04a9ab4..273da3f4ac 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java
@@ -21,12 +21,17 @@ package org.apache.gravitino.flink.connector.utils;
import static
org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions;
import static
org.apache.flink.table.factories.FactoryUtil.validateWatermarkOptions;
+import com.google.common.collect.ImmutableSet;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
+import
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
+import
org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactoryOptions;
+import
org.apache.gravitino.flink.connector.jdbc.GravitinoJdbcCatalogFactoryOptions;
+import
org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactoryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +41,19 @@ public class FactoryUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FactoryUtils.class);
+ /** The set of catalog type identifiers managed by Gravitino. */
+ private static final ImmutableSet<String> GRAVITINO_CATALOG_TYPES =
+ ImmutableSet.of(
+ GravitinoHiveCatalogFactoryOptions.IDENTIFIER,
+ GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
+ GravitinoJdbcCatalogFactoryOptions.MYSQL_IDENTIFIER,
+ GravitinoJdbcCatalogFactoryOptions.POSTGRESQL_IDENTIFIER,
+ GravitinoPaimonCatalogFactoryOptions.IDENTIFIER);
+
+ public static boolean isGravitinoManagedCatalogType(String type) {
+ return GRAVITINO_CATALOG_TYPES.contains(type);
+ }
+
/**
* Utility for working with {@link Factory}s. The {@link
GravitinoCatalogFactoryHelper} override
* the {@link FactoryUtil.CatalogFactoryHelper#validate()} method to
validate the options. For the
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/session/FlinkSupportsSessionCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/session/FlinkSupportsSessionCatalogIT.java
new file mode 100644
index 0000000000..4345d4ab9d
--- /dev/null
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/session/FlinkSupportsSessionCatalogIT.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.integration.test.session;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.stream.Stream;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
+import org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT;
+import
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests verifying that the {@code
gravitino.enableSessionCatalogSupport} option
+ * correctly routes catalog operations to the appropriate store:
+ *
+ * <ul>
+ * <li>Gravitino-managed catalog types (e.g. {@code gravitino-hive}) are
persisted to the
+ * Gravitino server via {@link
+ * org.apache.gravitino.flink.connector.store.GravitinoCatalogStore}.
+ * <li>Session-scoped catalog types (e.g. {@code generic_in_memory}) are
held in memory only and
+ * never reach the Gravitino server.
+ * <li>Both stores contribute to the catalog list returned by {@code SHOW
CATALOGS}.
+ * </ul>
+ *
+ * <p>Requires a running Hive Docker container for the Gravitino-managed
catalog tests.
+ */
+@Tag("gravitino-docker-test")
+public class FlinkSupportsSessionCatalogIT extends FlinkEnvIT {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkSupportsSessionCatalogIT.class);
+
+ private static String hiveConfDir;
+ private static Path hiveConfDirPath;
+
+ @Override
+ protected String getProvider() {
+ return "hive";
+ }
+
+ /**
+ * Overrides the default Flink environment to enable {@code
+ * gravitino.enableSessionCatalogSupport=true}, which wires a {@link
+ * org.apache.gravitino.flink.connector.store.GravitinoSessionCatalogStore}
as the catalog store.
+ */
+ @Override
+ protected void initFlinkEnv() {
+ initHiveConfDir();
+ final Configuration configuration = new Configuration();
+ configuration.setString(
+ "table.catalog-store.kind",
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
+
configuration.setString("table.catalog-store.gravitino.gravitino.metalake",
GRAVITINO_METALAKE);
+ configuration.setString("table.catalog-store.gravitino.gravitino.uri",
gravitinoUri);
+ configuration.setBoolean(
+ "table.catalog-store.gravitino.gravitino.enableSessionCatalogSupport",
true);
+ EnvironmentSettings settings =
+
EnvironmentSettings.newInstance().withConfiguration(configuration).inBatchMode().build();
+ tableEnv = TableEnvironment.create(settings);
+ LOG.info(
+ "Flink env with enableSessionCatalogSupport=true initialized,
Gravitino uri: {}.",
+ gravitinoUri);
+ }
+
+ @BeforeAll
+ void sessionCatalogStartUp() {
+ Preconditions.checkArgument(metalake != null, "metalake should not be
null");
+ LOG.info("FlinkSupportsSessionCatalogIT startup complete.");
+ }
+
+ @AfterAll
+ static void sessionCatalogStop() {
+ Preconditions.checkArgument(metalake != null, "metalake should not be
null");
+ deleteHiveConfDir();
+ LOG.info("FlinkSupportsSessionCatalogIT teardown complete.");
+ }
+
+ /**
+ * A Gravitino-managed catalog type ({@code gravitino-hive}) created via
{@link
+ * TableEnvironment#createCatalog} must be persisted to the Gravitino server.
+ */
+ @Test
+ public void testCreateGravitinoHiveCatalog() {
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ int numCatalogs = tableEnv.listCatalogs().length;
+ String catalogName = "session_it_gravitino_hive";
+
+ Configuration conf = new Configuration();
+ conf.set(CommonCatalogOptions.CATALOG_TYPE,
GravitinoHiveCatalogFactoryOptions.IDENTIFIER);
+ conf.set(HiveCatalogFactoryOptions.HIVE_CONF_DIR, hiveConfDir);
+ conf.set(GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS,
hiveMetastoreUri);
+ CatalogDescriptor descriptor = CatalogDescriptor.of(catalogName, conf);
+ tableEnv.createCatalog(catalogName, descriptor);
+
+ try {
+ Assertions.assertTrue(
+ metalake.catalogExists(catalogName),
+ "Gravitino-managed catalog must be persisted to the Gravitino
server");
+ String[] catalogs = tableEnv.listCatalogs();
+ Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create
a new catalog");
+ Assertions.assertTrue(
+ Arrays.asList(catalogs).contains(catalogName), "Should contain the
created catalog");
+ org.apache.gravitino.Catalog gravitinoCatalog =
metalake.loadCatalog(catalogName);
+ Assertions.assertEquals(
+ hiveMetastoreUri,
gravitinoCatalog.properties().get(HiveConstants.METASTORE_URIS));
+ } finally {
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ tableEnv.executeSql("DROP CATALOG " + catalogName);
+ Assertions.assertFalse(metalake.catalogExists(catalogName));
+ }
+ }
+
+ /**
+ * A Gravitino-managed catalog type ({@code gravitino-hive}) created via
Flink SQL {@code CREATE
+ * CATALOG} must be persisted to the Gravitino server.
+ */
+ @Test
+ public void testCreateGravitinoHiveCatalogUsingSQL() {
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ int numCatalogs = tableEnv.listCatalogs().length;
+ String catalogName = "session_it_gravitino_hive_sql";
+
+ tableEnv.executeSql(
+ String.format(
+ "CREATE CATALOG %s WITH ("
+ + "'type'='%s',"
+ + "'hive-conf-dir'='%s',"
+ + "'hive.metastore.uris'='%s'"
+ + ")",
+ catalogName,
+ GravitinoHiveCatalogFactoryOptions.IDENTIFIER,
+ hiveConfDir,
+ hiveMetastoreUri));
+
+ try {
+ Assertions.assertTrue(
+ metalake.catalogExists(catalogName),
+ "Gravitino-managed catalog must be persisted to the Gravitino
server");
+ String[] catalogs = tableEnv.listCatalogs();
+ Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create
a new catalog");
+ Assertions.assertTrue(
+ Arrays.asList(catalogs).contains(catalogName), "Should contain the
created catalog");
+ } finally {
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ tableEnv.executeSql("DROP CATALOG " + catalogName);
+ Assertions.assertFalse(metalake.catalogExists(catalogName));
+ }
+ }
+
+ /**
+ * A session-scoped catalog type ({@code generic_in_memory}) created via
Flink SQL must be
+ * accessible in Flink but must NOT be persisted to the Gravitino server.
+ */
+ @Test
+ public void testCreateSessionScopedCatalog() {
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ int numCatalogs = tableEnv.listCatalogs().length;
+ String catalogName = "session_it_memory_catalog";
+
+ tableEnv.executeSql(
+ String.format("CREATE CATALOG %s WITH ('type'='generic_in_memory')",
catalogName));
+
+ try {
+ Assertions.assertFalse(
+ metalake.catalogExists(catalogName),
+ "Session-scoped catalog must NOT be persisted to the Gravitino
server");
+ String[] catalogs = tableEnv.listCatalogs();
+ Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create
a new catalog");
+ Assertions.assertTrue(
+ Arrays.asList(catalogs).contains(catalogName), "Should contain the
created catalog");
+ } finally {
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ tableEnv.executeSql("DROP CATALOG " + catalogName);
+ }
+ }
+
+ /**
+ * {@code SHOW CATALOGS} must return catalogs from both the Gravitino-backed
store and the
+ * in-memory store when {@code enableSessionCatalogSupport=true}.
+ */
+ @Test
+ public void testListCatalogsReturnsBothStores() {
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ int numCatalogs = tableEnv.listCatalogs().length;
+ String gravitinoCatalogName = "session_it_list_gravitino_hive";
+ String sessionCatalogName = "session_it_list_memory_catalog";
+
+ tableEnv.executeSql(
+ String.format(
+ "CREATE CATALOG %s WITH ("
+ + "'type'='%s',"
+ + "'hive-conf-dir'='%s',"
+ + "'hive.metastore.uris'='%s'"
+ + ")",
+ gravitinoCatalogName,
+ GravitinoHiveCatalogFactoryOptions.IDENTIFIER,
+ hiveConfDir,
+ hiveMetastoreUri));
+ tableEnv.executeSql(
+ String.format("CREATE CATALOG %s WITH ('type'='generic_in_memory')",
sessionCatalogName));
+
+ try {
+ String[] catalogs = tableEnv.listCatalogs();
+ Assertions.assertEquals(numCatalogs + 2, catalogs.length, "Should have
two more catalogs");
+ Assertions.assertTrue(
+ Arrays.asList(catalogs).contains(gravitinoCatalogName),
+ "Gravitino-managed catalog must appear in SHOW CATALOGS");
+ Assertions.assertTrue(
+ Arrays.asList(catalogs).contains(sessionCatalogName),
+ "Session-scoped catalog must appear in SHOW CATALOGS");
+ } finally {
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ tableEnv.executeSql("DROP CATALOG " + gravitinoCatalogName);
+ tableEnv.executeSql("DROP CATALOG " + sessionCatalogName);
+ Assertions.assertFalse(metalake.catalogExists(gravitinoCatalogName));
+ }
+ }
+
+ /**
+ * Dropping a Gravitino-managed catalog via Flink SQL must remove it from
the Gravitino server and
+ * from the Flink catalog list.
+ */
+ @Test
+ public void testDropGravitinoHiveCatalog() {
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ String catalogName = "session_it_drop_gravitino_hive";
+
+ tableEnv.executeSql(
+ String.format(
+ "CREATE CATALOG %s WITH ("
+ + "'type'='%s',"
+ + "'hive-conf-dir'='%s',"
+ + "'hive.metastore.uris'='%s'"
+ + ")",
+ catalogName,
+ GravitinoHiveCatalogFactoryOptions.IDENTIFIER,
+ hiveConfDir,
+ hiveMetastoreUri));
+ Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+ tableEnv.executeSql("DROP CATALOG " + catalogName);
+
+ Assertions.assertFalse(
+ metalake.catalogExists(catalogName),
+ "Dropped Gravitino-managed catalog must be removed from the Gravitino
server");
+ Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName);
+ Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be
dropped");
+ }
+
+ /**
+ * Dropping a session-scoped catalog via Flink SQL must remove it from the
in-memory store without
+ * error, since it was never registered in Gravitino.
+ */
+ @Test
+ public void testDropSessionScopedCatalog() {
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ String catalogName = "session_it_drop_memory_catalog";
+
+ tableEnv.executeSql(
+ String.format("CREATE CATALOG %s WITH ('type'='generic_in_memory')",
catalogName));
+ Assertions.assertFalse(
+ metalake.catalogExists(catalogName),
+ "Session-scoped catalog must NOT be in Gravitino before drop");
+
+ tableEnv.executeSql("DROP CATALOG " + catalogName);
+
+ Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName);
+ Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be
dropped");
+ }
+
+ private static void initHiveConfDir() {
+ if (hiveConfDir != null) {
+ return;
+ }
+ try {
+ hiveConfDirPath = Files.createTempDirectory("flink-session-hive-conf");
+ Path hiveSite = hiveConfDirPath.resolve("hive-site.xml");
+ String hiveSiteXml =
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ + "<?xml-stylesheet type=\"text/xsl\"
href=\"configuration.xsl\"?>\n"
+ + "<configuration>\n"
+ + " <property>\n"
+ + " <name>hive.metastore.sasl.enabled</name>\n"
+ + " <value>false</value>\n"
+ + " </property>\n"
+ + " <property>\n"
+ + " <name>hive.metastore.uris</name>\n"
+ + " <value>"
+ + hiveMetastoreUri
+ + "</value>\n"
+ + " </property>\n"
+ + " <property>\n"
+ + " <name>hadoop.security.authentication</name>\n"
+ + " <value>simple</value>\n"
+ + " </property>\n"
+ + " <property>\n"
+ + " <name>hive.metastore.warehouse.dir</name>\n"
+ + " <value>"
+ + warehouse
+ + "</value>\n"
+ + " </property>\n"
+ + "</configuration>\n";
+ Files.write(hiveSite, hiveSiteXml.getBytes(StandardCharsets.UTF_8));
+ hiveConfDir = hiveConfDirPath.toAbsolutePath().toString();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to prepare hive conf dir for ITs", e);
+ }
+ }
+
+ private static void deleteHiveConfDir() {
+ if (hiveConfDirPath == null) {
+ return;
+ }
+ try (Stream<Path> walk = Files.walk(hiveConfDirPath)) {
+
walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete temp hive conf dir: {}", hiveConfDirPath, e);
+ } finally {
+ hiveConfDirPath = null;
+ hiveConfDir = null;
+ }
+ }
+}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoCatalogStoreFactory.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoCatalogStoreFactory.java
new file mode 100644
index 0000000000..86164adb48
--- /dev/null
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoCatalogStoreFactory.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.store;
+
+import static
org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper;
+import static org.mockito.Mockito.mock;
+
+import java.lang.reflect.Field;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.table.factories.CatalogStoreFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TableFactoryUtil;
+import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests verifying that {@link GravitinoCatalogStoreFactory} wires the
correct {@link
+ * CatalogStore} implementation based on the {@code
gravitino.enableSessionCatalogSupport} option.
+ */
+public class TestGravitinoCatalogStoreFactory {
+
+ /**
+ * When {@code gravitino.enableSessionCatalogSupport=true} the factory must
return a {@link
+ * GravitinoSessionCatalogStore}.
+ */
+ @Test
+ void
testCreateCatalogStore_sessionCatalogEnabled_returnsGravitinoSessionCatalogStore()
+ throws Exception {
+ GravitinoCatalogStoreFactory factory = factoryWith(true);
+
+ CatalogStore store = factory.createCatalogStore();
+
+ Assertions.assertInstanceOf(GravitinoSessionCatalogStore.class, store);
+ }
+
+ /**
+ * When {@code gravitino.enableSessionCatalogSupport=false} (the default)
the factory must return
+ * a plain {@link GravitinoCatalogStore}, not a session store.
+ */
+ @Test
+ void
testCreateCatalogStore_sessionCatalogDisabled_returnsGravitinoCatalogStore()
+ throws Exception {
+ GravitinoCatalogStoreFactory factory = factoryWith(false);
+
+ CatalogStore store = factory.createCatalogStore();
+
+ Assertions.assertInstanceOf(GravitinoCatalogStore.class, store);
+ Assertions.assertFalse(store instanceof GravitinoSessionCatalogStore);
+ }
+
+ /**
+ * Verifies that {@code gravitino.enableSessionCatalogSupport=true} is
correctly parsed from the
+ * Flink configuration by the factory's option helper.
+ */
+ @Test
+ void testOptionParsing_enableSessionCatalogSupportTrue_isReadFromConfig() {
+ Configuration configuration = baseConfiguration();
+ configuration.setBoolean(
+ "table.catalog-store.gravitino.gravitino.enableSessionCatalogSupport",
true);
+
+ boolean parsed = parseEnableSessionCatalogSupport(configuration);
+
+ Assertions.assertTrue(parsed);
+ }
+
+ /**
+ * Verifies that {@code gravitino.enableSessionCatalogSupport} defaults to
{@code false} when
+ * absent from the configuration.
+ */
+ @Test
+ void testOptionParsing_enableSessionCatalogSupportAbsent_defaultsToFalse() {
+ boolean parsed = parseEnableSessionCatalogSupport(baseConfiguration());
+
+ Assertions.assertFalse(parsed);
+ }
+
+ /**
+ * End-to-end: option parsed as {@code true} from {@link Configuration} must
wire through to
+ * {@link GravitinoSessionCatalogStore} being returned by {@link
+ * GravitinoCatalogStoreFactory#createCatalogStore()}.
+ */
+ @Test
+ void
testEndToEnd_sessionCatalogEnabled_returnsGravitinoSessionCatalogStore() throws
Exception {
+ Configuration configuration = baseConfiguration();
+ configuration.setBoolean(
+ "table.catalog-store.gravitino.gravitino.enableSessionCatalogSupport",
true);
+
+ boolean enableSessionCatalogSupport =
parseEnableSessionCatalogSupport(configuration);
+ GravitinoCatalogStoreFactory factory =
factoryWith(enableSessionCatalogSupport);
+
+ CatalogStore store = factory.createCatalogStore();
+
+ Assertions.assertInstanceOf(GravitinoSessionCatalogStore.class, store);
+ }
+
+ /**
+ * End-to-end: option absent from {@link Configuration} (defaults to {@code
false}) must wire
+ * through to a plain {@link GravitinoCatalogStore} being returned by {@link
+ * GravitinoCatalogStoreFactory#createCatalogStore()}.
+ */
+ @Test
+ void testEndToEnd_sessionCatalogAbsent_returnsPlainGravitinoCatalogStore()
throws Exception {
+ boolean enableSessionCatalogSupport =
parseEnableSessionCatalogSupport(baseConfiguration());
+ GravitinoCatalogStoreFactory factory =
factoryWith(enableSessionCatalogSupport);
+
+ CatalogStore store = factory.createCatalogStore();
+
+ Assertions.assertInstanceOf(GravitinoCatalogStore.class, store);
+ Assertions.assertFalse(store instanceof GravitinoSessionCatalogStore);
+ }
+
+ // -------------------------------------------------------------------------
+ // helpers
+ // -------------------------------------------------------------------------
+
+ /**
+ * Builds a {@link GravitinoCatalogStoreFactory} with {@code
enableSessionCatalogSupport} and a
+ * mocked {@link GravitinoCatalogManager} injected via reflection, bypassing
{@code open()} which
+ * requires a live Gravitino server.
+ */
+ private static GravitinoCatalogStoreFactory factoryWith(boolean
enableSessionCatalogSupport)
+ throws Exception {
+ GravitinoCatalogStoreFactory factory = new GravitinoCatalogStoreFactory();
+ GravitinoCatalogManager mockCatalogManager =
mock(GravitinoCatalogManager.class);
+ setField(factory, "catalogManager", mockCatalogManager);
+ setField(factory, "enableSessionCatalogSupport",
enableSessionCatalogSupport);
+ GravitinoCatalogStore gravitinoStore = new
GravitinoCatalogStore(mockCatalogManager);
+ setField(factory, "gravitinoCatalogStore", gravitinoStore);
+ if (enableSessionCatalogSupport) {
+ GenericInMemoryCatalogStore memoryStore = new
GenericInMemoryCatalogStore();
+ memoryStore.open();
+ setField(factory, "memoryCatalogStore", memoryStore);
+ setField(
+ factory, "catalogStore", new
GravitinoSessionCatalogStore(gravitinoStore, memoryStore));
+ } else {
+ setField(factory, "catalogStore", gravitinoStore);
+ }
+ return factory;
+ }
+
+ /**
+ * Calling {@link GravitinoCatalogStoreFactory#createCatalogStore()} more
than once must reuse the
+ * same store instances rather than creating new ones each time.
+ */
+ @Test
+ void testCreateCatalogStore_calledTwice_reusesStoreInstances() throws
Exception {
+ GravitinoCatalogStoreFactory factory = factoryWith(true);
+
+ CatalogStore first = factory.createCatalogStore();
+ CatalogStore second = factory.createCatalogStore();
+
+ Assertions.assertSame(
+ first, second, "createCatalogStore() must return the same instance on
repeated calls");
+ }
+
+ /**
+ * Parses the value of {@code gravitino.enableSessionCatalogSupport} from
the given configuration
+ * using the real Flink factory-helper path (same approach as {@link
TestGravitinoFlinkConfig}).
+ */
+ private static boolean parseEnableSessionCatalogSupport(Configuration
configuration) {
+ CatalogStoreFactory.Context context =
+ TableFactoryUtil.buildCatalogStoreFactoryContext(
+ configuration,
TestGravitinoCatalogStoreFactory.class.getClassLoader());
+ FactoryUtil.FactoryHelper<CatalogStoreFactory> helper =
+ createCatalogStoreFactoryHelper(new GravitinoCatalogStoreFactory(),
context);
+ helper.validate();
+ return helper
+ .getOptions()
+
.get(GravitinoCatalogStoreFactoryOptions.GRAVITINO_ENABLE_SESSION_CATALOG_SUPPORT);
+ }
+
+ private static Configuration baseConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.setString(
+ "table.catalog-store.kind",
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
+
configuration.setString("table.catalog-store.gravitino.gravitino.metalake",
"test_metalake");
+ configuration.setString("table.catalog-store.gravitino.gravitino.uri",
"http://127.0.0.1:8090");
+ return configuration;
+ }
+
+ private static void setField(Object target, String fieldName, Object value)
throws Exception {
+ Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoSessionCatalogStore.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoSessionCatalogStore.java
new file mode 100644
index 0000000000..1b6011757a
--- /dev/null
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoSessionCatalogStore.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.store;
+
+import static
org.apache.gravitino.flink.connector.utils.FactoryUtils.isGravitinoManagedCatalogType;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestGravitinoSessionCatalogStore {
+
+ /**
+ * A Gravitino-managed (built-in) catalog type — verified by {@link
+ *
org.apache.gravitino.flink.connector.utils.FactoryUtils#isGravitinoManagedCatalogType}.
+ * Catalogs of this type are routed to the Gravitino-backed store.
+ */
+ private static final String GRAVITINO_CATALOG_TYPE = "gravitino-hive";
+
+ /**
+ * A third-party (session) catalog type that Gravitino does not manage.
Catalogs of this type are
+ * routed to the in-memory store and exist only for the lifetime of the
Flink session.
+ */
+ private static final String SESSION_CATALOG_TYPE = "hive";
+
+ private GravitinoCatalogStore gravitinoCatalogStore;
+ private GenericInMemoryCatalogStore memoryCatalogStore;
+ private GravitinoSessionCatalogStore sessionCatalogStore;
+
+ @BeforeEach
+ void setUp() {
+ Assertions.assertTrue(
+ isGravitinoManagedCatalogType(GRAVITINO_CATALOG_TYPE),
+ "GRAVITINO_CATALOG_TYPE must be recognised by
isGravitinoManagedCatalogType()");
+ Assertions.assertFalse(
+ isGravitinoManagedCatalogType(SESSION_CATALOG_TYPE),
+ "SESSION_CATALOG_TYPE must NOT be recognised by
isGravitinoManagedCatalogType()");
+ gravitinoCatalogStore = mock(GravitinoCatalogStore.class);
+ memoryCatalogStore = mock(GenericInMemoryCatalogStore.class);
+ sessionCatalogStore =
+ new GravitinoSessionCatalogStore(gravitinoCatalogStore,
memoryCatalogStore);
+ }
+
+ // -------------------------------------------------------------------------
+ // storeCatalog
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testStoreCatalog_gravitinoCatalog_storesInGravitino() throws
CatalogException {
+ CatalogDescriptor descriptor = descriptorWithType(GRAVITINO_CATALOG_TYPE);
+
+ sessionCatalogStore.storeCatalog("gravitino-hive", descriptor);
+
+ verify(gravitinoCatalogStore).storeCatalog("gravitino-hive", descriptor);
+ verify(memoryCatalogStore, never()).storeCatalog("gravitino-hive",
descriptor);
+ }
+
+ @Test
+ void testStoreCatalog_sessionCatalog_storesInMemory() throws
CatalogException {
+ CatalogDescriptor descriptor = descriptorWithType(SESSION_CATALOG_TYPE);
+
+ sessionCatalogStore.storeCatalog("hive", descriptor);
+
+ verify(memoryCatalogStore).storeCatalog("hive", descriptor);
+ verify(gravitinoCatalogStore, never()).storeCatalog("hive", descriptor);
+ }
+
+ @Test
+ void testStoreCatalog_missingCatalogType_throwsCatalogException() {
+ CatalogDescriptor descriptor = CatalogDescriptor.of("unknown", new
Configuration());
+
+ Assertions.assertThrows(
+ CatalogException.class, () ->
sessionCatalogStore.storeCatalog("unknown", descriptor));
+ }
+
+ // -------------------------------------------------------------------------
+ // removeCatalog
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testRemoveCatalog_catalogInMemory_removesFromMemory() throws
CatalogException {
+ when(memoryCatalogStore.contains("hive")).thenReturn(true);
+
+ sessionCatalogStore.removeCatalog("hive", false);
+
+ verify(memoryCatalogStore).removeCatalog("hive", false);
+ verify(gravitinoCatalogStore, never()).removeCatalog("hive", false);
+ }
+
+ @Test
+ void testRemoveCatalog_catalogNotInMemory_removesFromGravitino() throws
CatalogException {
+ when(memoryCatalogStore.contains("gravitino-hive")).thenReturn(false);
+
+ sessionCatalogStore.removeCatalog("gravitino-hive", false);
+
+ verify(gravitinoCatalogStore).removeCatalog("gravitino-hive", false);
+ verify(memoryCatalogStore, never()).removeCatalog("gravitino-hive", false);
+ }
+
+ // -------------------------------------------------------------------------
+ // getCatalog
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testGetCatalog_catalogInMemory_returnsFromMemory() throws
CatalogException {
+ CatalogDescriptor expected = descriptorWithType(SESSION_CATALOG_TYPE);
+
when(memoryCatalogStore.getCatalog("hive")).thenReturn(Optional.of(expected));
+
+ Optional<CatalogDescriptor> result =
sessionCatalogStore.getCatalog("hive");
+
+ Assertions.assertTrue(result.isPresent());
+ Assertions.assertEquals(expected, result.get());
+ verify(gravitinoCatalogStore, never()).getCatalog("hive");
+ }
+
+ @Test
+ void testGetCatalog_catalogNotInMemory_returnsFromGravitino() throws
CatalogException {
+ CatalogDescriptor expected = descriptorWithType(GRAVITINO_CATALOG_TYPE);
+
when(memoryCatalogStore.getCatalog("gravitino-hive")).thenReturn(Optional.empty());
+
when(gravitinoCatalogStore.getCatalog("gravitino-hive")).thenReturn(Optional.of(expected));
+
+ Optional<CatalogDescriptor> result =
sessionCatalogStore.getCatalog("gravitino-hive");
+
+ Assertions.assertTrue(result.isPresent());
+ Assertions.assertEquals(expected, result.get());
+ verify(gravitinoCatalogStore).getCatalog("gravitino-hive");
+ }
+
+ // -------------------------------------------------------------------------
+ // listCatalogs
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testListCatalogs_returnsCombinedSet() throws CatalogException {
+
when(memoryCatalogStore.listCatalogs()).thenReturn(ImmutableSet.of("hive"));
+
when(gravitinoCatalogStore.listCatalogs()).thenReturn(ImmutableSet.of("gravitino-hive"));
+
+ Set<String> result = sessionCatalogStore.listCatalogs();
+
+ Assertions.assertEquals(ImmutableSet.of("hive", "gravitino-hive"), result);
+ }
+
+ @Test
+ void testListCatalogs_gravitinoThrows_wrapsCatalogException() {
+
when(memoryCatalogStore.listCatalogs()).thenReturn(ImmutableSet.of("hive"));
+ when(gravitinoCatalogStore.listCatalogs())
+ .thenThrow(new RuntimeException("Gravitino unavailable"));
+
+ Assertions.assertThrows(CatalogException.class, () ->
sessionCatalogStore.listCatalogs());
+ }
+
+ // -------------------------------------------------------------------------
+ // contains
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testContains_catalogInMemory_returnsTrue() {
+ when(memoryCatalogStore.contains("hive")).thenReturn(true);
+
+ Assertions.assertTrue(sessionCatalogStore.contains("hive"));
+ }
+
+ @Test
+ void testContains_catalogInGravitino_returnsTrue() {
+ when(memoryCatalogStore.contains("gravitino-hive")).thenReturn(false);
+ when(gravitinoCatalogStore.contains("gravitino-hive")).thenReturn(true);
+
+ Assertions.assertTrue(sessionCatalogStore.contains("gravitino-hive"));
+ }
+
+ @Test
+ void testContains_catalogInNeither_returnsFalse() {
+ when(memoryCatalogStore.contains("hive")).thenReturn(false);
+ when(gravitinoCatalogStore.contains("hive")).thenReturn(false);
+
+ Assertions.assertFalse(sessionCatalogStore.contains("hive"));
+ }
+
+ // -------------------------------------------------------------------------
+ // helpers
+ // -------------------------------------------------------------------------
+
+ private static CatalogDescriptor descriptorWithType(String type) {
+ Configuration config = new Configuration();
+ config.setString(CommonCatalogOptions.CATALOG_TYPE.key(), type);
+ return CatalogDescriptor.of(type, config);
+ }
+}