This is an automated email from the ASF dual-hosted git repository.
yuqi1129 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new d448cb1683 [#10737] fix(core): Avoid blocking dropCatalog on imported
schemas (#10738)
d448cb1683 is described below
commit d448cb16837bfa5e55e35735c4000b133c2241b3
Author: Yuhui <[email protected]>
AuthorDate: Wed Apr 22 21:16:55 2026 +0800
[#10737] fix(core): Avoid blocking dropCatalog on imported schemas (#10738)
### What changes were proposed in this pull request?
Fix schema classification in `dropCatalog(force = false)` so imported
schemas do not block catalog deletion.
### Why are the changes needed?
Imported schemas can be written into the entity store during metadata
synchronization and later be misclassified as user-created schemas.
That makes `dropCatalog(force = false)` fail with
`NonEmptyCatalogException` even though the remaining schema was imported
from the external catalog.
Fix: #10737
### Does this PR introduce _any_ user-facing change?
`dropCatalog(force = false)` no longer incorrectly fails when only
imported schemas remain.
### How was this patch tested?
- added unit coverage in `TestCatalogManager`
---
.../apache/gravitino/catalog/CatalogManager.java | 36 ++++-
.../gravitino/catalog/TestCatalogManager.java | 163 +++++++++++++++++++++
2 files changed, 196 insertions(+), 3 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index 7ccbd3cb3d..b7d540e8d7 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -73,6 +73,7 @@ import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.connector.CatalogOperations;
@@ -87,6 +88,7 @@ import
org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptyCatalogException;
import org.apache.gravitino.exceptions.NonEmptyEntityException;
import org.apache.gravitino.file.FilesetCatalog;
@@ -877,9 +879,37 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
Set<String> availableSchemaNames =
Arrays.stream(allSchemas).map(NameIdentifier::name).collect(Collectors.toSet());
- // some schemas are dropped externally, but still exist in the entity
store, those schemas are
- // invalid
- return
schemaEntities.stream().map(SchemaEntity::name).anyMatch(availableSchemaNames::contains);
+ // Some schemas are dropped externally but still exist in the entity store
— those are invalid.
+ // Among schemas that exist in the underlying catalog, only those created
via Gravitino carry a
+ // StringIdentifier in their external properties; imported schemas do not.
+ for (SchemaEntity schemaEntity : schemaEntities) {
+ if (!availableSchemaNames.contains(schemaEntity.name())) {
+ continue;
+ }
+
+ try {
+ Schema schema =
+ catalogWrapper.doWithSchemaOps(ops ->
ops.loadSchema(schemaEntity.nameIdentifier()));
+ Map<String, String> props = schema.properties();
+ // If the backend cannot store a StringIdentifier (null or empty
properties, e.g. MySQL
+ // which does not support schema comments), we cannot tell whether the
schema was created
+ // by Gravitino or imported. Be conservative and treat it as
user-created to avoid
+ // accidental data loss.
+ // Only skip a schema when properties are non-null, non-empty, and
contain no
+ // StringIdentifier — the reliable signal that the schema was imported
from an external
+ // catalog on a backend that does support identifier storage.
+ if (props == null || props.isEmpty() ||
StringIdentifier.fromProperties(props) != null) {
+ return true;
+ }
+ } catch (NoSuchSchemaException ex) {
+ // A race between listSchemas and loadSchema is expected; treat as
non-user-created.
+ LOG.debug(
+ "Schema {} no longer exists while checking whether it is
user-created",
+ schemaEntity.nameIdentifier());
+ }
+ }
+
+ return false;
}
/**
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
index 6f5148b4c5..7f64423038 100644
--- a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
+++ b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
@@ -46,12 +46,14 @@ import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
import org.apache.gravitino.connector.capability.Capability;
import org.apache.gravitino.connector.capability.CapabilityResult;
import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
import org.apache.gravitino.exceptions.CatalogInUseException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.BaseMetalake;
@@ -589,6 +591,167 @@ public class TestCatalogManager {
Assertions.assertNull(catalogManager.getCatalogCache().getIfPresent(ident));
}
+ @Test
+ public void testDropCatalogSkipsImportedSchemas() throws Exception {
+ NameIdentifier ident = NameIdentifier.of("metalake", "test41");
+ Map<String, String> props =
+ ImmutableMap.of(
+ "provider",
+ "test",
+ PROPERTY_KEY1,
+ "value1",
+ PROPERTY_KEY2,
+ "value2",
+ PROPERTY_KEY5_PREFIX + "1",
+ "value3");
+ String comment = "comment";
+
+ Catalog catalog =
+ catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider,
comment, props);
+ Mockito.doCallRealMethod().when(catalogManager).loadCatalogAndWrap(ident);
+ Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident));
+ CatalogEntity catalogEntity = entityStore.get(ident, EntityType.CATALOG,
CatalogEntity.class);
+ FieldUtils.writeField(catalog, "entity", catalogEntity, true);
+
+ SchemaEntity importedSchemaEntity =
+ SchemaEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("imported_schema")
+ .withNamespace(Namespace.of("metalake", "test41"))
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+ entityStore.put(importedSchemaEntity);
+
+ Schema importedSchema = Mockito.mock(Schema.class);
+ // Non-empty properties without StringIdentifier simulate an imported
schema on a backend
+ // that supports property storage (e.g., Hive, Iceberg) but did not create
this schema
+ // via Gravitino.
+ Mockito.doReturn(ImmutableMap.of("owner",
"external")).when(importedSchema).properties();
+ CatalogManager.CatalogWrapper wrapper =
Mockito.mock(CatalogManager.CatalogWrapper.class);
+ Capability capability = Mockito.mock(Capability.class);
+ CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not
managed");
+ Mockito.doReturn(wrapper).when(catalogManager).loadCatalogAndWrap(ident);
+ Mockito.doReturn(catalog).when(wrapper).catalog();
+ Mockito.doReturn(capability).when(wrapper).capabilities();
+ Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());
+ Mockito.doReturn(
+ new NameIdentifier[] {NameIdentifier.of("metalake", "test41",
"imported_schema")})
+ .doReturn(importedSchema)
+ .when(wrapper)
+ .doWithSchemaOps(any());
+
+ // Imported schema (no StringIdentifier in external catalog properties)
should not block drop.
+ Assertions.assertTrue(catalogManager.dropCatalog(ident));
+ }
+
+ @Test
+ public void testDropCatalogIgnoresMissingSchema() throws Exception {
+ NameIdentifier ident = NameIdentifier.of("metalake", "test41");
+ Map<String, String> props =
+ ImmutableMap.of(
+ "provider",
+ "test",
+ PROPERTY_KEY1,
+ "value1",
+ PROPERTY_KEY2,
+ "value2",
+ PROPERTY_KEY5_PREFIX + "1",
+ "value3");
+ String comment = "comment";
+
+ Catalog catalog =
+ catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider,
comment, props);
+ Mockito.doCallRealMethod().when(catalogManager).loadCatalogAndWrap(ident);
+ Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident));
+ CatalogEntity catalogEntity = entityStore.get(ident, EntityType.CATALOG,
CatalogEntity.class);
+ FieldUtils.writeField(catalog, "entity", catalogEntity, true);
+
+ SchemaEntity schemaEntity =
+ SchemaEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("default")
+ .withNamespace(Namespace.of("metalake", "test41"))
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+ entityStore.put(schemaEntity);
+
+ CatalogManager.CatalogWrapper wrapper =
Mockito.mock(CatalogManager.CatalogWrapper.class);
+ Capability capability = Mockito.mock(Capability.class);
+ CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not
managed");
+ Mockito.doReturn(wrapper).when(catalogManager).loadCatalogAndWrap(ident);
+ Mockito.doReturn(catalog).when(wrapper).catalog();
+ Mockito.doReturn(capability).when(wrapper).capabilities();
+ Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());
+ Mockito.doReturn(new NameIdentifier[] {NameIdentifier.of("metalake",
"test41", "default")})
+ .doThrow(new NoSuchSchemaException("Schema not found"))
+ .when(wrapper)
+ .doWithSchemaOps(any());
+
+ // Schema disappearing between listSchemas and loadSchema should not block
drop.
+ Assertions.assertTrue(catalogManager.dropCatalog(ident));
+ }
+
+ @Test
+ public void testDropCatalogFailsOnSchemaClassificationError() throws
Exception {
+ NameIdentifier ident = NameIdentifier.of("metalake", "test41");
+ Map<String, String> props =
+ ImmutableMap.of(
+ "provider",
+ "test",
+ PROPERTY_KEY1,
+ "value1",
+ PROPERTY_KEY2,
+ "value2",
+ PROPERTY_KEY5_PREFIX + "1",
+ "value3");
+ String comment = "comment";
+
+ Catalog catalog =
+ catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider,
comment, props);
+ Mockito.doCallRealMethod().when(catalogManager).loadCatalogAndWrap(ident);
+ Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident));
+ CatalogEntity catalogEntity = entityStore.get(ident, EntityType.CATALOG,
CatalogEntity.class);
+ FieldUtils.writeField(catalog, "entity", catalogEntity, true);
+
+ SchemaEntity schemaEntity =
+ SchemaEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("test_schema1")
+ .withNamespace(Namespace.of("metalake", "test41"))
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+ entityStore.put(schemaEntity);
+
+ CatalogManager.CatalogWrapper wrapper =
Mockito.mock(CatalogManager.CatalogWrapper.class);
+ Capability capability = Mockito.mock(Capability.class);
+ CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not
managed");
+ Mockito.doReturn(wrapper).when(catalogManager).loadCatalogAndWrap(ident);
+ Mockito.doReturn(catalog).when(wrapper).catalog();
+ Mockito.doReturn(capability).when(wrapper).capabilities();
+ Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());
+ Mockito.doReturn(new NameIdentifier[] {NameIdentifier.of("metalake",
"test41", "test_schema1")})
+ .doThrow(new RuntimeException("Failed connect"))
+ .when(wrapper)
+ .doWithSchemaOps(any());
+
+ // Unexpected errors during schema classification should propagate
(fail-closed).
+ RuntimeException ex =
+ Assertions.assertThrows(RuntimeException.class, () ->
catalogManager.dropCatalog(ident));
+ Assertions.assertTrue(ex.getCause().getMessage().contains("Failed
connect"));
+ }
+
@Test
public void testForceDropCatalog() throws Exception {
NameIdentifier ident = NameIdentifier.of("metalake", "test41");