RussellSpitzer commented on code in PR #1285:
URL: https://github.com/apache/polaris/pull/1285#discussion_r2060739409
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -762,7 +777,190 @@ public LoadTableResponse updateTable(
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade
external catalogs.");
}
- return CatalogHandlers.updateTable(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ // TODO: pending discussion if table property is right way, or a writer
specific knob is
+ // required.
+ return updateTableWithRollback(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#update
+ private static LoadTableResponse updateTableWithRollback(
+ Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
+ TableMetadata finalMetadata;
+ if (isCreate(request)) {
+ Transaction transaction =
+ catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+ if (!(transaction instanceof BaseTransaction)) {
+ throw new IllegalStateException(
+ "Cannot wrap catalog that does not produce BaseTransaction");
+ }
+
+ BaseTransaction baseTransaction = (BaseTransaction) transaction;
+ finalMetadata = create(baseTransaction.underlyingOps(), request);
+ } else {
+ Table table = catalog.loadTable(ident);
+ if (!(table instanceof BaseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+
+ TableOperations ops = ((BaseTable) table).operations();
+ finalMetadata = commit(ops, request);
+ }
+
+ return
LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#create
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest
request) {
+ request.requirements().forEach((requirement) ->
requirement.validate(ops.current()));
+ Optional<Integer> formatVersion =
+ request.updates().stream()
+ .filter((update) -> update instanceof
MetadataUpdate.UpgradeFormatVersion)
+ .map((update) -> ((MetadataUpdate.UpgradeFormatVersion)
update).formatVersion())
+ .findFirst();
+ TableMetadata.Builder builder =
+ (TableMetadata.Builder)
+ formatVersion
+ .map(TableMetadata::buildFromEmpty)
+ .orElseGet(TableMetadata::buildFromEmpty);
+ request.updates().forEach((update) -> update.applyTo(builder));
+ ops.commit((TableMetadata) null, builder.build());
+ return ops.current();
+ }
+
+ @VisibleForTesting
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#commit
+ public static TableMetadata commit(TableOperations ops, UpdateTableRequest
request) {
+ AtomicBoolean isRetry = new AtomicBoolean(false);
+
+ try {
+ Tasks.foreach(new TableOperations[] {ops})
+ .retry(4)
+ .exponentialBackoff(100L, 60000L, 1800000L, (double) 2.0F)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(
+ (taskOps) -> {
+ TableMetadata base = isRetry.get() ? taskOps.refresh() :
taskOps.current();
+ isRetry.set(true);
+ // Prev PR: https://github.com/apache/iceberg/pull/5888
+ boolean rollbackCompaction =
+ PropertyUtil.propertyAsBoolean(
+ taskOps.current().properties(),
ROLLBACK_REPLACE_ENABLED_PROPERTY, false);
+
+ TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(base);
+ TableMetadata newBase = base;
+ try {
+ request.requirements().forEach((requirement) ->
requirement.validate(base));
+ } catch (CommitFailedException e) {
+ if (!rollbackCompaction) {
+ throw new ValidationFailureException(e);
+ }
+ // Since snapshot has already been created at the client end.
+ // Nothing much can be done, we can move this
+ // to writer specific thing, but it would be cool if catalog
does this for us.
+ // Inspect that the requirements states that snapshot
+ // ref needs to be asserted this usually means in the update
section
+ // it has addSnapshot and setSnapshotRef
+ UpdateRequirement.AssertRefSnapshotID addSnapshot = null;
+ int found = 0;
+ for (UpdateRequirement requirement : request.requirements())
{
+ // there should be only add snapshot request
+ if (requirement instanceof
UpdateRequirement.AssertRefSnapshotID) {
+ ++found;
+ addSnapshot = (UpdateRequirement.AssertRefSnapshotID)
requirement;
+ }
+ }
+
+ if (found != 1) {
+ // TODO: handle this case, find min snapshot id, to
rollback to give it creates
+ // lineage
+ // lets not complicate things rn
+ throw new ValidationFailureException(e);
+ }
+
+ Long parentSnapshotId = addSnapshot.snapshotId();
+ // so we will first check all the snapshots on the top of
+ // base on which the snapshot we want to commit is of type
REPLACE ops.
+ Long parentToRollbackTo =
base.currentSnapshot().snapshotId();
+ List<MetadataUpdate> updateToRemoveSnapshot = new
ArrayList<>();
+ while (!Objects.equals(parentToRollbackTo,
parentSnapshotId)) {
+ Snapshot snap = ops.current().snapshot(parentToRollbackTo);
+ if (!DataOperations.REPLACE.equals(snap.operation())) {
+ break;
+ }
+ updateToRemoveSnapshot.add(
+ new MetadataUpdate.RemoveSnapshot(snap.snapshotId()));
+ parentToRollbackTo = snap.parentId();
+ }
+
+ MetadataUpdate.SetSnapshotRef ref = null;
+ found = 0;
+ // find the SetRefName snapshot update
+ for (MetadataUpdate update : request.updates()) {
+ if (update instanceof MetadataUpdate.SetSnapshotRef) {
+ ++found;
+ ref = (MetadataUpdate.SetSnapshotRef) update;
+ }
+ }
+
+ if (found != 1 || !Objects.equals(parentToRollbackTo,
parentSnapshotId)) {
Review Comment:
Is this also validating that we don't have another branch which is based off
of the commit we are rollingback?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]