This is an automated email from the ASF dual-hosted git repository.

yuqi1129 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new d448cb1683 [#10737] fix(core): Avoid blocking dropCatalog on imported 
schemas (#10738)
d448cb1683 is described below

commit d448cb16837bfa5e55e35735c4000b133c2241b3
Author: Yuhui <[email protected]>
AuthorDate: Wed Apr 22 21:16:55 2026 +0800

    [#10737] fix(core): Avoid blocking dropCatalog on imported schemas (#10738)
    
    ### What changes were proposed in this pull request?
    
    Fix schema classification in `dropCatalog(force = false)` so imported
    schemas do not block catalog deletion.
    
    ### Why are the changes needed?
    
    Imported schemas can be written into the entity store during metadata
    synchronization and later be misclassified as user-created schemas.
    
    That makes `dropCatalog(force = false)` fail with
    `NonEmptyCatalogException` even though the remaining schema was imported
    from the external catalog.
    
    Fix: #10737
    
    ### Does this PR introduce _any_ user-facing change?
    
    `dropCatalog(force = false)` no longer incorrectly fails when only
    imported schemas remain.
    
    ### How was this patch tested?
    
    - added unit coverage in `TestCatalogManager`
---
 .../apache/gravitino/catalog/CatalogManager.java   |  36 ++++-
 .../gravitino/catalog/TestCatalogManager.java      | 163 +++++++++++++++++++++
 2 files changed, 196 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index 7ccbd3cb3d..b7d540e8d7 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -73,6 +73,7 @@ import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
 import org.apache.gravitino.StringIdentifier;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.CatalogOperations;
@@ -87,6 +88,7 @@ import 
org.apache.gravitino.exceptions.GravitinoRuntimeException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptyCatalogException;
 import org.apache.gravitino.exceptions.NonEmptyEntityException;
 import org.apache.gravitino.file.FilesetCatalog;
@@ -877,9 +879,37 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
     Set<String> availableSchemaNames =
         
Arrays.stream(allSchemas).map(NameIdentifier::name).collect(Collectors.toSet());
 
-    // some schemas are dropped externally, but still exist in the entity 
store, those schemas are
-    // invalid
-    return 
schemaEntities.stream().map(SchemaEntity::name).anyMatch(availableSchemaNames::contains);
+    // Some schemas are dropped externally but still exist in the entity store 
— those are invalid.
+    // Among schemas that exist in the underlying catalog, only those created 
via Gravitino carry a
+    // StringIdentifier in their external properties; imported schemas do not.
+    for (SchemaEntity schemaEntity : schemaEntities) {
+      if (!availableSchemaNames.contains(schemaEntity.name())) {
+        continue;
+      }
+
+      try {
+        Schema schema =
+            catalogWrapper.doWithSchemaOps(ops -> 
ops.loadSchema(schemaEntity.nameIdentifier()));
+        Map<String, String> props = schema.properties();
+        // If the backend cannot store a StringIdentifier (null or empty 
properties, e.g. MySQL
+        // which does not support schema comments), we cannot tell whether the 
schema was created
+        // by Gravitino or imported. Be conservative and treat it as 
user-created to avoid
+        // accidental data loss.
+        // Only skip a schema when properties are non-null, non-empty, and 
contain no
+        // StringIdentifier — the reliable signal that the schema was imported 
from an external
+        // catalog on a backend that does support identifier storage.
+        if (props == null || props.isEmpty() || 
StringIdentifier.fromProperties(props) != null) {
+          return true;
+        }
+      } catch (NoSuchSchemaException ex) {
+        // A race between listSchemas and loadSchema is expected; treat as 
non-user-created.
+        LOG.debug(
+            "Schema {} no longer exists while checking whether it is 
user-created",
+            schemaEntity.nameIdentifier());
+      }
+    }
+
+    return false;
   }
 
   /**
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java 
b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
index 6f5148b4c5..7f64423038 100644
--- a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
+++ b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
@@ -46,12 +46,14 @@ import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
 import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.connector.capability.CapabilityResult;
 import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
 import org.apache.gravitino.exceptions.CatalogInUseException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
@@ -589,6 +591,167 @@ public class TestCatalogManager {
     
Assertions.assertNull(catalogManager.getCatalogCache().getIfPresent(ident));
   }
 
+  @Test
+  public void testDropCatalogSkipsImportedSchemas() throws Exception {
+    NameIdentifier ident = NameIdentifier.of("metalake", "test41");
+    Map<String, String> props =
+        ImmutableMap.of(
+            "provider",
+            "test",
+            PROPERTY_KEY1,
+            "value1",
+            PROPERTY_KEY2,
+            "value2",
+            PROPERTY_KEY5_PREFIX + "1",
+            "value3");
+    String comment = "comment";
+
+    Catalog catalog =
+        catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, 
comment, props);
+    Mockito.doCallRealMethod().when(catalogManager).loadCatalogAndWrap(ident);
+    Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident));
+    CatalogEntity catalogEntity = entityStore.get(ident, EntityType.CATALOG, 
CatalogEntity.class);
+    FieldUtils.writeField(catalog, "entity", catalogEntity, true);
+
+    SchemaEntity importedSchemaEntity =
+        SchemaEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("imported_schema")
+            .withNamespace(Namespace.of("metalake", "test41"))
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+    entityStore.put(importedSchemaEntity);
+
+    Schema importedSchema = Mockito.mock(Schema.class);
+    // Non-empty properties without StringIdentifier simulate an imported 
schema on a backend
+    // that supports property storage (e.g., Hive, Iceberg) but did not create 
this schema
+    // via Gravitino.
+    Mockito.doReturn(ImmutableMap.of("owner", 
"external")).when(importedSchema).properties();
+    CatalogManager.CatalogWrapper wrapper = 
Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Capability capability = Mockito.mock(Capability.class);
+    CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not 
managed");
+    Mockito.doReturn(wrapper).when(catalogManager).loadCatalogAndWrap(ident);
+    Mockito.doReturn(catalog).when(wrapper).catalog();
+    Mockito.doReturn(capability).when(wrapper).capabilities();
+    Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());
+    Mockito.doReturn(
+            new NameIdentifier[] {NameIdentifier.of("metalake", "test41", 
"imported_schema")})
+        .doReturn(importedSchema)
+        .when(wrapper)
+        .doWithSchemaOps(any());
+
+    // Imported schema (no StringIdentifier in external catalog properties) 
should not block drop.
+    Assertions.assertTrue(catalogManager.dropCatalog(ident));
+  }
+
+  @Test
+  public void testDropCatalogIgnoresMissingSchema() throws Exception {
+    NameIdentifier ident = NameIdentifier.of("metalake", "test41");
+    Map<String, String> props =
+        ImmutableMap.of(
+            "provider",
+            "test",
+            PROPERTY_KEY1,
+            "value1",
+            PROPERTY_KEY2,
+            "value2",
+            PROPERTY_KEY5_PREFIX + "1",
+            "value3");
+    String comment = "comment";
+
+    Catalog catalog =
+        catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, 
comment, props);
+    Mockito.doCallRealMethod().when(catalogManager).loadCatalogAndWrap(ident);
+    Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident));
+    CatalogEntity catalogEntity = entityStore.get(ident, EntityType.CATALOG, 
CatalogEntity.class);
+    FieldUtils.writeField(catalog, "entity", catalogEntity, true);
+
+    SchemaEntity schemaEntity =
+        SchemaEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("default")
+            .withNamespace(Namespace.of("metalake", "test41"))
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+    entityStore.put(schemaEntity);
+
+    CatalogManager.CatalogWrapper wrapper = 
Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Capability capability = Mockito.mock(Capability.class);
+    CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not 
managed");
+    Mockito.doReturn(wrapper).when(catalogManager).loadCatalogAndWrap(ident);
+    Mockito.doReturn(catalog).when(wrapper).catalog();
+    Mockito.doReturn(capability).when(wrapper).capabilities();
+    Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());
+    Mockito.doReturn(new NameIdentifier[] {NameIdentifier.of("metalake", 
"test41", "default")})
+        .doThrow(new NoSuchSchemaException("Schema not found"))
+        .when(wrapper)
+        .doWithSchemaOps(any());
+
+    // Schema disappearing between listSchemas and loadSchema should not block 
drop.
+    Assertions.assertTrue(catalogManager.dropCatalog(ident));
+  }
+
+  @Test
+  public void testDropCatalogFailsOnSchemaClassificationError() throws 
Exception {
+    NameIdentifier ident = NameIdentifier.of("metalake", "test41");
+    Map<String, String> props =
+        ImmutableMap.of(
+            "provider",
+            "test",
+            PROPERTY_KEY1,
+            "value1",
+            PROPERTY_KEY2,
+            "value2",
+            PROPERTY_KEY5_PREFIX + "1",
+            "value3");
+    String comment = "comment";
+
+    Catalog catalog =
+        catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, 
comment, props);
+    Mockito.doCallRealMethod().when(catalogManager).loadCatalogAndWrap(ident);
+    Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident));
+    CatalogEntity catalogEntity = entityStore.get(ident, EntityType.CATALOG, 
CatalogEntity.class);
+    FieldUtils.writeField(catalog, "entity", catalogEntity, true);
+
+    SchemaEntity schemaEntity =
+        SchemaEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("test_schema1")
+            .withNamespace(Namespace.of("metalake", "test41"))
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+    entityStore.put(schemaEntity);
+
+    CatalogManager.CatalogWrapper wrapper = 
Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Capability capability = Mockito.mock(Capability.class);
+    CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not 
managed");
+    Mockito.doReturn(wrapper).when(catalogManager).loadCatalogAndWrap(ident);
+    Mockito.doReturn(catalog).when(wrapper).catalog();
+    Mockito.doReturn(capability).when(wrapper).capabilities();
+    Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());
+    Mockito.doReturn(new NameIdentifier[] {NameIdentifier.of("metalake", 
"test41", "test_schema1")})
+        .doThrow(new RuntimeException("Failed connect"))
+        .when(wrapper)
+        .doWithSchemaOps(any());
+
+    // Unexpected errors during schema classification should propagate 
(fail-closed).
+    RuntimeException ex =
+        Assertions.assertThrows(RuntimeException.class, () -> 
catalogManager.dropCatalog(ident));
+    Assertions.assertTrue(ex.getCause().getMessage().contains("Failed 
connect"));
+  }
+
   @Test
   public void testForceDropCatalog() throws Exception {
     NameIdentifier ident = NameIdentifier.of("metalake", "test41");

Reply via email to