This is an automated email from the ASF dual-hosted git repository.
jmclean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 9c7fc94ffb [#10656] improvement(core): Centralize schema persistence
error messa… (#10676)
9c7fc94ffb is described below
commit 9c7fc94ffbb2526bb9036751e90b2b1d8de219c9
Author: Babu Mahesh <[email protected]>
AuthorDate: Wed Apr 8 11:32:06 2026 +0530
[#10656] improvement(core): Centralize schema persistence error messa…
(#10676)
### What changes were proposed in this pull request?
Centralizes schema persistence error messages and adds automatic
rollback functionality when schema metadata fails to
persist to Gravitino's store after successful creation in the underlying
catalog. The changes include:
1. Added centralized error message constants in
OperationDispatcher.FormattedErrorMessages for schema persistence
failures
2. Implemented rollbackEntityCreation() method to automatically delete
the schema from the underlying catalog when
Gravitino store persistence fails
3. Enhanced SchemaOperationDispatcher.createSchema() to attempt rollback
on store failures and throw appropriate exceptions
based on rollback outcome
### Why are the changes needed?
When a schema is successfully created in the underlying catalog but
fails to persist to Gravitino's metadata store (e.g.,
due to network issues, storage failures), it creates an inconsistency
where the schema exists in the catalog but not in
Gravitino's records. This leads to:
- Orphaned resources that Gravitino cannot manage
- Confusing state where subsequent operations may fail unexpectedly
The automatic rollback mechanism ensures consistency between the
underlying catalog and Gravitino's metadata store,
preventing orphaned resources and maintaining system integrity.
### How was this patch tested?
New unit tests in TestSchemaOperationDispatcher verify successful
rollbacks and rollback failures that retain the original
error reason back to callers.
Additionally, on the UI, an error message popup is displayed when
failures occur.
<img width="2536" height="1353" alt="Screenshot From 2026-04-05
00-49-11"
src="https://github.com/user-attachments/assets/65b680f1-95b7-44a1-aeef-ca020d3c3549"
/>
<img width="1592" height="1034" alt="image"
src="https://github.com/user-attachments/assets/1b6101a3-032e-4881-b6c0-730cb20f70df"
/>
Fix: #10656
---
.../gravitino/catalog/OperationDispatcher.java | 50 +++++++++++++
.../catalog/SchemaOperationDispatcher.java | 28 +++++--
.../catalog/TestSchemaOperationDispatcher.java | 86 ++++++++++++++++++----
3 files changed, 145 insertions(+), 19 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java
index 006e05c64b..5e10e101bb 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java
@@ -34,6 +34,8 @@ import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.PropertiesMetadata;
import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.file.FilesetChange;
import org.apache.gravitino.messaging.TopicChange;
@@ -129,6 +131,42 @@ public abstract class OperationDispatcher {
}
}
+ /**
+ * Attempts to rollback entity creation in the underlying catalog.
+ *
+ * <p>This method is used when entity metadata fails to persist to the
Gravitino store after
+ * successful creation in the underlying catalog. It attempts to rollback
the creation to maintain
+ * consistency between the catalog and Gravitino's metadata store.
+ *
+ * @param catalogIdent The identifier of the catalog.
+ * @param entityIdent The identifier of the entity to rollback.
+ * @param entityType The type of entity (e.g., "schema", "table", "topic")
for logging purposes.
+ * @param rollbackOperation The operation to perform rollback. This should
be a function that
+ * takes a CatalogWrapper and performs the necessary deletion operation
(e.g., dropSchema,
+ * dropTable, etc.).
+ * @param <R> The return type of the rollback operation (usually Boolean for
drop operations).
+ * @throws GravitinoRuntimeException If rollback fails. The exception will
contain details about
+ * the rollback failure.
+ */
+ protected <R> void rollbackEntityCreation(
+ NameIdentifier catalogIdent,
+ NameIdentifier entityIdent,
+ String entityType,
+ ThrowableFunction<CatalogManager.CatalogWrapper, R> rollbackOperation) {
+ try {
+ LOG.warn(
+ "Failed to persist {} metadata to Gravitino store for: {}. "
+ + "Attempting to rollback {} creation in underlying catalog to
maintain consistency.",
+ entityType,
+ entityIdent,
+ entityType);
+ doWithCatalog(catalogIdent, rollbackOperation,
NoSuchCatalogException.class);
+ LOG.info("Successfully rolled back {} creation for: {}", entityType,
entityIdent);
+ } catch (Exception ex) {
+ throw ex;
+ }
+ }
+
protected Set<String> getHiddenPropertyNames(
NameIdentifier catalogIdent,
ThrowableFunction<HasPropertyMetadata, PropertiesMetadata> provider,
@@ -289,5 +327,17 @@ public abstract class OperationDispatcher {
+ "identifier in the property {}, this is unexpected if this
object is created by "
+ "Gravitino. This might be due to some operations that are not
performed through Gravitino. "
+ "With this situation the returned object will not contain the
metadata from Gravitino";
+
+ static final String ENTITY_PERSIST_FAILURE_WITH_ROLLBACK_FAILURE =
+ "Failed to persist schema metadata to Gravitino store for: %s. "
+ + "Additionally, rollback of schema creation in underlying catalog
failed. "
+ + "The schema may exist in the underlying catalog but is not
tracked by Gravitino.";
+
+ static final String ENTITY_PERSIST_FAILURE_WITH_ROLLBACK_SUCCESS =
+ "Failed to persist schema metadata to Gravitino store for: %s. "
+ + "Schema creation in underlying catalog has been rolled back.";
+
+ static final String ENTITY_SCHEMA_ROLLBACK_FAILED =
+ "Failed to rollback schema creation in underlying catalog for: {}.";
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
index b61c99e0d7..10d290f8b1 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.catalog;
import static org.apache.gravitino.Entity.EntityType.SCHEMA;
+import static
org.apache.gravitino.catalog.OperationDispatcher.FormattedErrorMessages;
import static
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
import static
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
@@ -33,6 +34,7 @@ import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
@@ -157,12 +159,26 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
store.put(schemaEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident,
e);
- return EntityCombinedSchema.of(schema)
- .withHiddenProperties(
- getHiddenPropertyNames(
- catalogIdent,
- HasPropertyMetadata::schemaPropertiesMetadata,
- schema.properties()));
+
+ // Attempt to rollback the schema creation in the underlying
catalog
+ try {
+ rollbackEntityCreation(
+ catalogIdent,
+ ident,
+ "schema",
+ catalogWrapper ->
+ catalogWrapper.doWithSchemaOps(
+ schemas -> schemas.dropSchema(ident, true /* cascade
*/)));
+ } catch (GravitinoRuntimeException ex) {
+ // Rollback failed - combine original exception with rollback
failure
+ LOG.error(FormattedErrorMessages.ENTITY_SCHEMA_ROLLBACK_FAILED,
ident, ex);
+ throw new GravitinoRuntimeException(
+ e,
FormattedErrorMessages.ENTITY_PERSIST_FAILURE_WITH_ROLLBACK_FAILURE, ident);
+ }
+
+ // Rollback succeeded - throw original exception with rollback
success context
+ throw new GravitinoRuntimeException(
+ e,
FormattedErrorMessages.ENTITY_PERSIST_FAILURE_WITH_ROLLBACK_SUCCESS, ident);
}
// Merge both the metadata from catalog operation and the metadata
from entity store.
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java
index f57a8c4038..bd32c4c735 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestSchemaOperationDispatcher.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
@@ -46,6 +48,7 @@ import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.meta.AuditInfo;
@@ -114,25 +117,82 @@ public class TestSchemaOperationDispatcher extends
TestOperationDispatcher {
.filter(s -> s.name().equals("schema1"))
.findFirst();
Assertions.assertTrue(ident1.isPresent());
+ }
- // Test when the entity store failed to put the schema entity
- doThrow(new IOException()).when(entityStore).put(any(), anyBoolean());
- NameIdentifier schemaIdent2 = NameIdentifier.of(ns, "schema2");
- Schema schema2 = dispatcher.createSchema(schemaIdent2, "comment", props);
+ @Test
+ public void testCreateSchemaFailsWhenStorePutFailsAndRollbackSucceeds()
throws IOException {
+ Namespace ns = Namespace.of(metalake, catalog);
+ NameIdentifier schemaIdent = NameIdentifier.of(ns,
"schema_rollback_success");
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
- // Check if the created Schema's field values are correct
- Assertions.assertEquals("schema2", schema2.name());
- Assertions.assertEquals("comment", schema2.comment());
- testProperties(props, schema2.properties());
+ // Spy on the dispatcher to verify rollbackEntityCreation is called
+ SchemaOperationDispatcher spyDispatcher = spy(dispatcher);
- // Check if the Schema entity is stored in the EntityStore
- Assertions.assertFalse(entityStore.exists(schemaIdent2, SCHEMA));
+ // Mock store.put() to fail
+ reset(entityStore);
+ doThrow(new IOException("Store failure due to network issue"))
+ .when(entityStore)
+ .put(any(), anyBoolean());
+
+ // createSchema should fail with GravitinoRuntimeException after
successful rollback
+ GravitinoRuntimeException exception =
+ Assertions.assertThrows(
+ GravitinoRuntimeException.class,
+ () -> spyDispatcher.createSchema(schemaIdent, "comment", props));
+
+ // Verify the original cause is preserved
+ Assertions.assertEquals(
+ "Store failure due to network issue",
exception.getCause().getMessage());
+
+ // Verify that rollbackEntityCreation was called
+ verify(spyDispatcher).rollbackEntityCreation(any(), eq(schemaIdent),
eq("schema"), any());
+
+ // Verify that the schema entity is NOT in the store
+ Assertions.assertFalse(entityStore.exists(schemaIdent, SCHEMA));
Assertions.assertThrows(
NoSuchEntityException.class,
- () -> entityStore.get(schemaIdent2, SCHEMA, SchemaEntity.class));
+ () -> entityStore.get(schemaIdent, SCHEMA, SchemaEntity.class));
+ }
- // Audit info is gotten from the catalog, not from the entity store
- Assertions.assertEquals("test", schema2.auditInfo().creator());
+ @Test
+ public void testCreateSchemaFailsWhenStorePutFailsAndRollbackFails() throws
Exception {
+ Namespace ns = Namespace.of(metalake, catalog);
+ NameIdentifier schemaIdent = NameIdentifier.of(ns, "schema_rollback_fail");
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+
+ // Spy on the dispatcher to make rollbackEntityCreation throw an exception
+ SchemaOperationDispatcher spyDispatcher = spy(dispatcher);
+
+ // Make rollbackEntityCreation throw an exception to simulate rollback
failure
+ doThrow(new GravitinoRuntimeException("Rollback failed - connection lost"))
+ .when(spyDispatcher)
+ .rollbackEntityCreation(any(), any(), any(), any());
+
+ // Mock store.put() to fail after successful schema creation
+ reset(entityStore);
+ IOException storeException = new IOException("Store failure due to network
issue");
+ doThrow(storeException).when(entityStore).put(any(), anyBoolean());
+
+ // createSchema should fail with GravitinoRuntimeException when rollback
fails
+ GravitinoRuntimeException exception =
+ Assertions.assertThrows(
+ GravitinoRuntimeException.class,
+ () -> spyDispatcher.createSchema(schemaIdent, "comment", props));
+
+ // Verify the original cause is preserved (the IOException from store.put)
+ Assertions.assertEquals(
+ "Store failure due to network issue",
exception.getCause().getMessage());
+
+ // Verify that rollbackEntityCreation was called
+ Assertions.assertThrows(
+ Exception.class,
+ () -> spyDispatcher.rollbackEntityCreation(any(), eq(schemaIdent),
eq("schema"), any()));
+
+ // Verify that the schema entity is NOT in the store
+ Assertions.assertFalse(entityStore.exists(schemaIdent, SCHEMA));
+ Assertions.assertThrows(
+ NoSuchEntityException.class,
+ () -> entityStore.get(schemaIdent, SCHEMA, SchemaEntity.class));
}
@Test