RussellSpitzer commented on code in PR #1285:
URL: https://github.com/apache/polaris/pull/1285#discussion_r2060790953
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -762,7 +777,190 @@ public LoadTableResponse updateTable(
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade
external catalogs.");
}
- return CatalogHandlers.updateTable(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ // TODO: pending discussion if table property is right way, or a writer
specific knob is
+ // required.
+ return updateTableWithRollback(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#update
+ private static LoadTableResponse updateTableWithRollback(
+ Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
+ TableMetadata finalMetadata;
+ if (isCreate(request)) {
+ Transaction transaction =
+ catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+ if (!(transaction instanceof BaseTransaction)) {
+ throw new IllegalStateException(
+ "Cannot wrap catalog that does not produce BaseTransaction");
+ }
+
+ BaseTransaction baseTransaction = (BaseTransaction) transaction;
+ finalMetadata = create(baseTransaction.underlyingOps(), request);
+ } else {
+ Table table = catalog.loadTable(ident);
+ if (!(table instanceof BaseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+
+ TableOperations ops = ((BaseTable) table).operations();
+ finalMetadata = commit(ops, request);
+ }
+
+ return
LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#create
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest
request) {
+ request.requirements().forEach((requirement) ->
requirement.validate(ops.current()));
+ Optional<Integer> formatVersion =
+ request.updates().stream()
+ .filter((update) -> update instanceof
MetadataUpdate.UpgradeFormatVersion)
+ .map((update) -> ((MetadataUpdate.UpgradeFormatVersion)
update).formatVersion())
+ .findFirst();
+ TableMetadata.Builder builder =
+ (TableMetadata.Builder)
+ formatVersion
+ .map(TableMetadata::buildFromEmpty)
+ .orElseGet(TableMetadata::buildFromEmpty);
+ request.updates().forEach((update) -> update.applyTo(builder));
+ ops.commit((TableMetadata) null, builder.build());
+ return ops.current();
+ }
+
+ @VisibleForTesting
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#commit
+ public static TableMetadata commit(TableOperations ops, UpdateTableRequest
request) {
+ AtomicBoolean isRetry = new AtomicBoolean(false);
+
+ try {
+ Tasks.foreach(new TableOperations[] {ops})
+ .retry(4)
+ .exponentialBackoff(100L, 60000L, 1800000L, (double) 2.0F)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(
+ (taskOps) -> {
+ TableMetadata base = isRetry.get() ? taskOps.refresh() :
taskOps.current();
+ isRetry.set(true);
+ // Prev PR: https://github.com/apache/iceberg/pull/5888
+ boolean rollbackCompaction =
+ PropertyUtil.propertyAsBoolean(
+ taskOps.current().properties(),
ROLLBACK_REPLACE_ENABLED_PROPERTY, false);
+
+ TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(base);
+ TableMetadata newBase = base;
+ try {
+ request.requirements().forEach((requirement) ->
requirement.validate(base));
+ } catch (CommitFailedException e) {
+ if (!rollbackCompaction) {
+ throw new ValidationFailureException(e);
+ }
+ // Since snapshot has already been created at the client end.
+ // Nothing much can be done, we can move this
+ // to writer specific thing, but it would be cool if catalog
does this for us.
+ // Inspect that the requirements states that snapshot
+ // ref needs to be asserted this usually means in the update
section
+ // it has addSnapshot and setSnapshotRef
+ UpdateRequirement.AssertRefSnapshotID addSnapshot = null;
+ int found = 0;
+ for (UpdateRequirement requirement : request.requirements())
{
+ // there should be only add snapshot request
+ if (requirement instanceof
UpdateRequirement.AssertRefSnapshotID) {
+ ++found;
+ addSnapshot = (UpdateRequirement.AssertRefSnapshotID)
requirement;
+ }
+ }
+
+ if (found != 1) {
+ // TODO: handle this case, find min snapshot id, to
rollback to give it creates
+ // lineage
+ // lets not complicate things rn
+ throw new ValidationFailureException(e);
+ }
+
+ Long parentSnapshotId = addSnapshot.snapshotId();
+ // so we will first check all the snapshots on the top of
+ // base on which the snapshot we want to commit is of type
REPLACE ops.
+ Long parentToRollbackTo =
base.currentSnapshot().snapshotId();
+ List<MetadataUpdate> updateToRemoveSnapshot = new
ArrayList<>();
+ while (!Objects.equals(parentToRollbackTo,
parentSnapshotId)) {
+ Snapshot snap = ops.current().snapshot(parentToRollbackTo);
+ if (!DataOperations.REPLACE.equals(snap.operation())) {
+ break;
+ }
+ updateToRemoveSnapshot.add(
+ new MetadataUpdate.RemoveSnapshot(snap.snapshotId()));
+ parentToRollbackTo = snap.parentId();
+ }
+
+ MetadataUpdate.SetSnapshotRef ref = null;
+ found = 0;
+ // find the SetRefName snapshot update
+ for (MetadataUpdate update : request.updates()) {
+ if (update instanceof MetadataUpdate.SetSnapshotRef) {
+ ++found;
+ ref = (MetadataUpdate.SetSnapshotRef) update;
+ }
+ }
+
+ if (found != 1 || !Objects.equals(parentToRollbackTo,
parentSnapshotId)) {
+ // nothing can be done as this implies there was a non
replace
+ // snapshot in between or there is more than setRef ops,
we don't know where
+ // to go.
+ throw new ValidationFailureException(e);
+ }
+
+ // first we should also set back the ref we wanted to set,
back to the base
+ // on which the current update is based on.
+ metadataBuilder.setBranchSnapshot(parentSnapshotId,
ref.name());
+
+ // apply the remove snapshots update in the current metadata.
+ // NOTE: we need to setRef to parent first and then apply
remove as the remove
+ // will drop. The tags / branch which don't have reference.
+ // NOTE: we can skip removing the now orphan base. Its not a
hard requirement.
+ // just something good to do, and not leave for Remove
Orphans.
+ updateToRemoveSnapshot.forEach((update ->
update.applyTo(metadataBuilder)));
+ // Ref rolled back update correctly to snapshot to be
committed parent now.
+ newBase = metadataBuilder.build();
+ // move the lastSequenceNumber back, to apply snapshot
properly.
+ // Seq number are considered increasing monotonically,
snapshot over snapshot, so
+ // this is important.
+ Class<?> clazz = newBase.getClass();
+ try {
+ Field field = clazz.getDeclaredField("lastSequenceNumber");
+ field.setAccessible(true);
+ // this should point to the sequence number that current
tip of the
+ // branch belongs to, as the new commit will be applied on
top of this.
+ field.set(newBase,
newBase.currentSnapshot().sequenceNumber());
+ } catch (NoSuchFieldException | IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ // double check if the requirements passes now.
+ try {
+ TableMetadata baseWithRemovedSnaps = newBase;
+ request
+ .requirements()
+ .forEach((requirement) ->
requirement.validate(baseWithRemovedSnaps));
+ } catch (CommitFailedException e) {
+ throw new ValidationFailureException(e);
+ }
+
+ TableMetadata.Builder newMetadataBuilder =
TableMetadata.buildFrom(newBase);
+ request.updates().forEach((update) ->
update.applyTo(newMetadataBuilder));
+ TableMetadata updated = newMetadataBuilder.build();
+ // always commit this
+ taskOps.commit(base, updated);
Review Comment:
I think this is an interesting approach and probably the right one, but I'm
wondering if you considered just reloading the metadata.json from the time of
the rollback and applying the commit to that?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]