singhpk234 commented on code in PR #1285:
URL: https://github.com/apache/polaris/pull/1285#discussion_r2159185184
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +450,203 @@ protected TableMetadata commit(TableOperations ops,
UpdateTableRequest request)
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(
- taskOps -> {
+ (taskOps) -> {
TableMetadata base = isRetry.get() ? taskOps.refresh() :
taskOps.current();
- isRetry.set(true);
- // validate requirements
+ TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(base);
+ TableMetadata newBase = base;
try {
- request.requirements().forEach(requirement ->
requirement.validate(base));
+ request.requirements().forEach((requirement) ->
requirement.validate(base));
} catch (CommitFailedException e) {
- // wrap and rethrow outside of tasks to avoid unnecessary
retry
- throw new ValidationFailureException(e);
+ if (!isRollbackCompactionEnabled()) {
+ throw new ValidationFailureException(e);
+ }
+ UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+ findAssertRefSnapshotID(request);
+ MetadataUpdate.SetSnapshotRef setSnapshotRef =
findSetSnapshotRefUpdate(request);
+
+ if (assertRefSnapshotId == null || setSnapshotRef == null) {
+ // This implies the request was not trying to add a
snapshot.
+ throw new ValidationFailureException(e);
+ }
+
+ // snapshot-id the client expects the table
current_snapshot_id
+ long expectedCurrentSnapshotId =
assertRefSnapshotId.snapshotId();
+
+ MetadataUpdate.AddSnapshot snapshotToBeAdded =
findAddSnapshotUpdate(request);
+ if (snapshotToBeAdded == null) {
+ // Re-throw if, there's no snapshot data to be added.
+ throw new ValidationFailureException(e);
+ }
+
+ List<MetadataUpdate> metadataUpdates =
+ generateUpdatesToRemoveNoopSnapshot(
+ base,
+ expectedCurrentSnapshotId,
+ setSnapshotRef.name(),
+ snapshotToBeAdded.snapshot().sequenceNumber());
+
+ if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+ // Nothing can be done as this implies that there were not
all
+ // No-op snapshots (REPLACE) between
expectedCurrentSnapshotId and
+ // currentSnapshotId. hence re-throw the exception caught.
+ throw new ValidationFailureException(e);
+ }
+ // Set back the ref we wanted to set, back to the snapshot-id
+ // the client is expecting the table to be at.
+ metadataBuilder.setBranchSnapshot(
+ expectedCurrentSnapshotId, setSnapshotRef.name());
+
+ // apply the remove snapshots update in the current metadata.
+ // NOTE: we need to setRef to expectedCurrentSnapshotId
first and then apply
+ // remove, as otherwise the remove will drop the reference.
+ // NOTE: we can skip removing the now orphan base. Its not a
hard requirement.
+ // just something good to do, and not leave for Remove
Orphans.
+ // Ref rolled back update correctly to snapshot to be
committed parent now.
+ metadataUpdates.forEach((update ->
update.applyTo(metadataBuilder)));
+ newBase =
setAppropriateLastSeqNumber(metadataBuilder.build());
}
-
- // apply changes
- TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(base);
- request.updates().forEach(update ->
update.applyTo(metadataBuilder));
-
- TableMetadata updated = metadataBuilder.build();
- if (updated.changes().isEmpty()) {
- // do not commit if the metadata has not changed
- return;
+ // double check if the requirements passes now.
+ try {
+ TableMetadata baseWithRemovedSnaps = newBase;
+ request
+ .requirements()
+ .forEach((requirement) ->
requirement.validate(baseWithRemovedSnaps));
+ } catch (CommitFailedException e) {
+ throw new ValidationFailureException(e);
}
- // commit
+ TableMetadata.Builder newMetadataBuilder =
TableMetadata.buildFrom(newBase);
+ request.updates().forEach((update) ->
update.applyTo(newMetadataBuilder));
+ TableMetadata updated = newMetadataBuilder.build();
+ // always commit this
taskOps.commit(base, updated);
});
-
} catch (ValidationFailureException e) {
throw e.wrapped();
}
return ops.current();
}
+ private UpdateRequirement.AssertRefSnapshotID findAssertRefSnapshotID(
+ UpdateTableRequest request) {
+ UpdateRequirement.AssertRefSnapshotID assertRefSnapshotID = null;
+ int total = 0;
+ for (UpdateRequirement requirement : request.requirements()) {
+ if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
+ ++total;
+ assertRefSnapshotID = (UpdateRequirement.AssertRefSnapshotID)
requirement;
+ }
+ }
+
+ // if > 1 assertion for refs, then it's not safe to roll back, make this
Noop.
+ return total != 1 ? null : assertRefSnapshotID;
+ }
+
+ private List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot(
+ TableMetadata base,
+ long expectedCurrentSnapshotId,
+ String updateRefName,
+ long newSnapshotSeqNumber) {
+ // find the all the snapshots we want to retain which are not the part of
current branch.
+ Set<Long> idsToRetain = Sets.newHashSet();
+ for (Map.Entry<String, SnapshotRef> ref : base.refs().entrySet()) {
+ String refName = ref.getKey();
+ SnapshotRef snapshotRef = ref.getValue();
+ if (refName.equals(updateRefName)) {
+ continue;
+ }
+ idsToRetain.add(ref.getValue().snapshotId());
+ // Always check the ancestry for both branch and tags
+ // mostly for case where a branch was created and then was dropped
+ // then a tag was created and then rollback happened post that tag
+ // was dropped and branch was re-created on it.
+ for (Snapshot ancestor :
SnapshotUtil.ancestorsOf(snapshotRef.snapshotId(), base::snapshot)) {
+ idsToRetain.add(ancestor.snapshotId());
+ }
+ }
+
+ List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>();
+ Long snapshotId = base.ref(updateRefName).snapshotId(); // current tip of
the given branch
+ // ensure this branch has the latest sequence number.
+ long expectedSequenceNumber = base.lastSequenceNumber();
+ Set<Long> snapshotsToRemove = new LinkedHashSet<>();
+ while (snapshotId != null && !Objects.equals(snapshotId,
expectedCurrentSnapshotId)) {
+ Snapshot snap = base.snapshot(snapshotId);
+ if (expectedSequenceNumber != snap.sequenceNumber()) {
+ break;
+ }
+ if (!isRollbackSnapshot(snap) || idsToRetain.contains(snapshotId)) {
+ // Either encountered a non no-op snapshot or the snapshot is being
referenced by any other
+ // reference either by branch or a tag.
+ break;
+ }
+ snapshotsToRemove.add(snap.snapshotId());
+ snapshotId = snap.parentId();
+ // we need continuous sequence number to correctly rollback
+ expectedSequenceNumber--;
+ }
+
+ boolean wasExpectedSnapshotReached = Objects.equals(snapshotId,
expectedCurrentSnapshotId);
+ updateToRemoveSnapshot.add(new
MetadataUpdate.RemoveSnapshots(snapshotsToRemove));
+ return wasExpectedSnapshotReached ? updateToRemoveSnapshot : null;
+ }
+
+ private boolean isRollbackSnapshot(Snapshot snapshot) {
+ // Only Snapshots with {@ROLLBACKABLE_REPLACE_SNAPSHOT} are allowed to be
rollback.
+ return DataOperations.REPLACE.equals(snapshot.operation())
+ && PropertyUtil.propertyAsString(snapshot.summary(),
CONFLICT_RESOLUTION_ACTION, "")
+ .equalsIgnoreCase("rollback");
+ }
+
+ private MetadataUpdate.SetSnapshotRef
findSetSnapshotRefUpdate(UpdateTableRequest request) {
+ int total = 0;
+ MetadataUpdate.SetSnapshotRef setSnapshotRefUpdate = null;
+ // find the SetRefName snapshot update
+ for (MetadataUpdate update : request.updates()) {
+ if (update instanceof MetadataUpdate.SetSnapshotRef) {
+ total++;
+ setSnapshotRefUpdate = (MetadataUpdate.SetSnapshotRef) update;
+ }
+ }
+
+ // if > 1 assertion for refs, then it's not safe to rollback, make this
Noop.
+ return total != 1 ? null : setSnapshotRefUpdate;
+ }
+
+ private MetadataUpdate.AddSnapshot findAddSnapshotUpdate(UpdateTableRequest
request) {
+ int total = 0;
+ MetadataUpdate.AddSnapshot addSnapshot = null;
+ // find the SetRefName snapshot update
+ for (MetadataUpdate update : request.updates()) {
+ if (update instanceof MetadataUpdate.AddSnapshot) {
+ total++;
+ addSnapshot = (MetadataUpdate.AddSnapshot) update;
+ }
+ }
+
+ // if > 1 assertion for addSnapshot, then it's not safe to rollback, make
this Noop.
+ return total != 1 ? null : addSnapshot;
+ }
+
+ private TableMetadata setAppropriateLastSeqNumber(TableMetadata newBase) {
+ // TODO: Get rid of the reflection call once TableMetadata have API for it.
Review Comment:
done : https://github.com/apache/polaris/issues/1917 assigned to myself
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]