justinmclean opened a new issue, #10601:
URL: https://github.com/apache/gravitino/issues/10601
### What would you like to be improved?
Enabling or disabling a metalake is not atomic. In
MetalakeManager.enableMetalake and the corresponding disable path, Gravitino
first updates the metalake-level-in-use flag in storage, then updates the
propagated metalake-in-use property on each catalog.
If one of the catalog updates fails, the API returns an error, but the
metalake-level flag has already been committed. This leaves the system in a
partially updated state, where the metalake is marked as enabled/disabled,
while some catalogs still have the old metalake-in-use value.
### How should we improve?
Make metalake enable/disable behave transactionally from the caller’s
perspective. A few workable options are:
- Update all catalogs first and only persist the metalake-level flag if
every catalog update succeeds.
- Add rollback logic so a failed catalog propagation restores the
metalake-level flag and any catalogs already updated.
If full atomicity is not feasible, fail the operation before updating the
metalake state unless all catalog updates succeed.
Here's a unit test to help:
```
@Test
public void testEnableMetalakeShouldRollbackWhenCatalogPropagationFails()
throws IllegalAccessException, IOException {
NameIdentifier ident = NameIdentifier.of("partialEnableMetalake");
CatalogManager originalCatalogManager =
(CatalogManager) FieldUtils.readField(GravitinoEnv.getInstance(),
"catalogManager", true);
CatalogManager mockCatalogManager = Mockito.mock(CatalogManager.class);
metalakeManager.createMetalake(ident, "comment", ImmutableMap.of());
entityStore.put(createCatalogEntity(100L, ident.name(), "catalog1"));
entityStore.put(createCatalogEntity(101L, ident.name(), "catalog2"));
entityStore.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
HashMap<String, String> properties = new
HashMap<>(metalake.properties());
properties.put(Metalake.PROPERTY_IN_USE, "false");
BaseMetalake.Builder builder =
BaseMetalake.builder()
.withId(metalake.id())
.withName(metalake.name())
.withComment(metalake.comment())
.withProperties(properties)
.withVersion(metalake.getVersion())
.withAuditInfo(metalake.auditInfo());
return builder.build();
});
doThrow(new RuntimeException("catalog update failed"))
.when(mockCatalogManager)
.setMetalakeInUseStatus(any(NameIdentifier.class), anyBoolean());
FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager",
mockCatalogManager, true);
try {
Assertions.assertThrows(RuntimeException.class, () ->
metalakeManager.enableMetalake(ident));
Assertions.assertFalse(
MetalakeManager.metalakeInUse(entityStore, ident),
"Metalake should remain disabled when catalog propagation fails");
verify(mockCatalogManager, times(2))
.setMetalakeInUseStatus(any(NameIdentifier.class), anyBoolean());
} finally {
FieldUtils.writeField(
GravitinoEnv.getInstance(), "catalogManager",
originalCatalogManager, true);
metalakeManager.dropMetalake(ident, true);
}
}
private CatalogEntity createCatalogEntity(Long id, String metalake, String
catalog) {
AuditInfo auditInfo =
AuditInfo.builder()
.withCreator(AuthConstants.ANONYMOUS_USER)
.withCreateTime(Instant.now())
.withLastModifier(AuthConstants.ANONYMOUS_USER)
.withLastModifiedTime(Instant.now())
.build();
return CatalogEntity.builder()
.withId(id)
.withName(catalog)
.withNamespace(Namespace.of(metalake))
.withType(Catalog.Type.RELATIONAL)
.withProvider("test")
.withComment("")
.withProperties(ImmutableMap.of())
.withAuditInfo(auditInfo)
.build();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]