dimas-b commented on code in PR #1285:
URL: https://github.com/apache/polaris/pull/1285#discussion_r2095767205


##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +446,174 @@ protected TableMetadata commit(TableOperations ops, 
UpdateTableRequest request)
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(
-              taskOps -> {
+              (taskOps) -> {
                 TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
-                isRetry.set(true);
 
-                // validate requirements
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
                 try {
-                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
                 } catch (CommitFailedException e) {
-                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry
-                  throw new ValidationFailureException(e);
+                  if (!isRollbackCompactionEnabled()) {
+                    throw new ValidationFailureException(e);
+                  }
+                  UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+                      findAssertRefSnapshotID(request);
+                  MetadataUpdate.SetSnapshotRef setSnapshotRef = 
findSetSnapshotRefUpdate(request);
+
+                  if (assertRefSnapshotId == null || setSnapshotRef == null) {
+                    // This implies the request was not trying to add a 
snapshot.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // snapshot-id the client expects the table 
current_snapshot_id
+                  long expectedCurrentSnapshotId = 
assertRefSnapshotId.snapshotId();
+                  // table current_snapshot_id.
+                  long currentSnapshotId = base.currentSnapshot().snapshotId();
+                  List<MetadataUpdate> metadataUpdates =
+                      generateUpdatesToRemoveNoopSnapshot(
+                          base,
+                          currentSnapshotId,
+                          expectedCurrentSnapshotId,
+                          setSnapshotRef.name());
+
+                  if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+                    // Nothing can be done as this implies that there were not 
all
+                    // No-op snapshots (REPLACE) between 
expectedCurrentSnapshotId and
+                    // currentSnapshotId. hence re-throw the exception caught.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // Set back the ref we wanted to set, back to the snapshot-id
+                  // the client is expecting the table to be at.
+                  metadataBuilder.setBranchSnapshot(
+                      expectedCurrentSnapshotId, setSnapshotRef.name());
+
+                  // apply the remove snapshots update in the current metadata.
+                  // NOTE: we need to setRef to expectedCurrentSnapshotId 
first and then apply
+                  // remove, as otherwise the remove will drop the reference.
+                  // NOTE: we can skip removing the now orphan base. Its not a 
hard requirement.
+                  // just something good to do, and not leave for Remove 
Orphans.
+                  metadataUpdates.forEach((update -> 
update.applyTo(metadataBuilder)));
+                  // Ref rolled back update correctly to snapshot to be 
committed parent now.
+                  newBase = metadataBuilder.build();
+                  // move the lastSequenceNumber back, to apply snapshot 
properly on the
+                  // current-metadata Seq number are considered increasing 
monotonically
+                  // snapshot over snapshot, the client generates the manifest 
list and hence
+                  // the sequence number can't be changed for a snapshot the 
only possible option
+                  // then is to change the sequenceNumber tracked by 
metadata.json
+                  Class<?> clazz = newBase.getClass();
+                  try {
+                    Field field = clazz.getDeclaredField("lastSequenceNumber");
+                    field.setAccessible(true);
+                    // this should point to the sequence number that current 
tip of the
+                    // branch belongs to, as the new commit will be applied on 
top of this.
+                    field.set(newBase, 
newBase.currentSnapshot().sequenceNumber());
+                  } catch (NoSuchFieldException | IllegalAccessException ex) {
+                    throw new RuntimeException(ex);
+                  }
                 }
-
-                // apply changes
-                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
-                request.updates().forEach(update -> 
update.applyTo(metadataBuilder));
-
-                TableMetadata updated = metadataBuilder.build();
-                if (updated.changes().isEmpty()) {
-                  // do not commit if the metadata has not changed
-                  return;
+                // double check if the requirements passes now.
+                try {
+                  TableMetadata baseWithRemovedSnaps = newBase;
+                  request
+                      .requirements()
+                      .forEach((requirement) -> 
requirement.validate(baseWithRemovedSnaps));
+                } catch (CommitFailedException e) {
+                  throw new ValidationFailureException(e);
                 }
 
-                // commit
+                TableMetadata.Builder newMetadataBuilder = 
TableMetadata.buildFrom(newBase);
+                request.updates().forEach((update) -> 
update.applyTo(newMetadataBuilder));
+                TableMetadata updated = newMetadataBuilder.build();
+                // always commit this
                 taskOps.commit(base, updated);
               });
-
     } catch (ValidationFailureException e) {
       throw e.wrapped();
     }
 
     return ops.current();
   }
 
+  private UpdateRequirement.AssertRefSnapshotID findAssertRefSnapshotID(
+      UpdateTableRequest request) {
+    UpdateRequirement.AssertRefSnapshotID assertRefSnapshotID = null;
+    int total = 0;
+    for (UpdateRequirement requirement : request.requirements()) {
+      if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
+        ++total;
+        assertRefSnapshotID = (UpdateRequirement.AssertRefSnapshotID) 
requirement;
+      }
+    }
+
+    // if > 1 assertion for refs, then it's not safe to roll back, make this 
Noop.
+    return total != 1 ? null : assertRefSnapshotID;
+  }
+
+  private List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot(
+      TableMetadata base,
+      long currentSnapshotId,
+      long expectedCurrentSnapshotId,
+      String updateRefName) {
+    // find the all the snapshots we want to retain which are not the part of 
current branch.
+    Set<Long> idsToRetain = Sets.newHashSet();
+    for (Map.Entry<String, SnapshotRef> ref : base.refs().entrySet()) {
+      String refName = ref.getKey();
+      SnapshotRef snapshotRef = ref.getValue();
+      if (refName.equals(updateRefName)) {
+        continue;
+      }
+      idsToRetain.add(ref.getValue().snapshotId());
+      // check all other branches
+      if (snapshotRef.isBranch()) {
+        for (Snapshot ancestor :
+            SnapshotUtil.ancestorsOf(snapshotRef.snapshotId(), 
base::snapshot)) {
+          idsToRetain.add(ancestor.snapshotId());
+        }

Review Comment:
   Why not do this for tags? Is it not possible to make a branch from a tag 
later?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -94,6 +104,8 @@
 public class CatalogHandlerUtils {
   private static final Schema EMPTY_SCHEMA = new Schema();
   private static final String INITIAL_PAGE_TOKEN = "";
+  private static final String ROLLBACKABLE_REPLACE_SNAPSHOT =
+      "polaris.rollback.compaction.on-conflict";

Review Comment:
   nit: The REPLACE operation is not limited to compactions, I guess. How about 
`polaris.conflict-resolution.by-operation-type.replace=rollback`?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +446,174 @@ protected TableMetadata commit(TableOperations ops, 
UpdateTableRequest request)
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(
-              taskOps -> {
+              (taskOps) -> {
                 TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
-                isRetry.set(true);
 
-                // validate requirements
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
                 try {
-                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
                 } catch (CommitFailedException e) {
-                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry
-                  throw new ValidationFailureException(e);
+                  if (!isRollbackCompactionEnabled()) {
+                    throw new ValidationFailureException(e);
+                  }
+                  UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+                      findAssertRefSnapshotID(request);
+                  MetadataUpdate.SetSnapshotRef setSnapshotRef = 
findSetSnapshotRefUpdate(request);
+
+                  if (assertRefSnapshotId == null || setSnapshotRef == null) {
+                    // This implies the request was not trying to add a 
snapshot.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // snapshot-id the client expects the table 
current_snapshot_id
+                  long expectedCurrentSnapshotId = 
assertRefSnapshotId.snapshotId();
+                  // table current_snapshot_id.
+                  long currentSnapshotId = base.currentSnapshot().snapshotId();
+                  List<MetadataUpdate> metadataUpdates =
+                      generateUpdatesToRemoveNoopSnapshot(
+                          base,
+                          currentSnapshotId,
+                          expectedCurrentSnapshotId,
+                          setSnapshotRef.name());
+
+                  if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+                    // Nothing can be done as this implies that there were not 
all
+                    // No-op snapshots (REPLACE) between 
expectedCurrentSnapshotId and
+                    // currentSnapshotId. hence re-throw the exception caught.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // Set back the ref we wanted to set, back to the snapshot-id
+                  // the client is expecting the table to be at.
+                  metadataBuilder.setBranchSnapshot(
+                      expectedCurrentSnapshotId, setSnapshotRef.name());
+
+                  // apply the remove snapshots update in the current metadata.
+                  // NOTE: we need to setRef to expectedCurrentSnapshotId 
first and then apply
+                  // remove, as otherwise the remove will drop the reference.
+                  // NOTE: we can skip removing the now orphan base. Its not a 
hard requirement.
+                  // just something good to do, and not leave for Remove 
Orphans.
+                  metadataUpdates.forEach((update -> 
update.applyTo(metadataBuilder)));
+                  // Ref rolled back update correctly to snapshot to be 
committed parent now.
+                  newBase = metadataBuilder.build();
+                  // move the lastSequenceNumber back, to apply snapshot 
properly on the
+                  // current-metadata Seq number are considered increasing 
monotonically
+                  // snapshot over snapshot, the client generates the manifest 
list and hence
+                  // the sequence number can't be changed for a snapshot the 
only possible option
+                  // then is to change the sequenceNumber tracked by 
metadata.json
+                  Class<?> clazz = newBase.getClass();
+                  try {
+                    Field field = clazz.getDeclaredField("lastSequenceNumber");
+                    field.setAccessible(true);
+                    // this should point to the sequence number that current 
tip of the
+                    // branch belongs to, as the new commit will be applied on 
top of this.
+                    field.set(newBase, 
newBase.currentSnapshot().sequenceNumber());
+                  } catch (NoSuchFieldException | IllegalAccessException ex) {
+                    throw new RuntimeException(ex);
+                  }
                 }
-
-                // apply changes
-                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
-                request.updates().forEach(update -> 
update.applyTo(metadataBuilder));
-
-                TableMetadata updated = metadataBuilder.build();
-                if (updated.changes().isEmpty()) {
-                  // do not commit if the metadata has not changed
-                  return;
+                // double check if the requirements passes now.
+                try {
+                  TableMetadata baseWithRemovedSnaps = newBase;
+                  request
+                      .requirements()
+                      .forEach((requirement) -> 
requirement.validate(baseWithRemovedSnaps));
+                } catch (CommitFailedException e) {
+                  throw new ValidationFailureException(e);
                 }
 
-                // commit
+                TableMetadata.Builder newMetadataBuilder = 
TableMetadata.buildFrom(newBase);
+                request.updates().forEach((update) -> 
update.applyTo(newMetadataBuilder));
+                TableMetadata updated = newMetadataBuilder.build();
+                // always commit this
                 taskOps.commit(base, updated);
               });
-
     } catch (ValidationFailureException e) {
       throw e.wrapped();
     }
 
     return ops.current();
   }
 
+  private UpdateRequirement.AssertRefSnapshotID findAssertRefSnapshotID(
+      UpdateTableRequest request) {
+    UpdateRequirement.AssertRefSnapshotID assertRefSnapshotID = null;
+    int total = 0;
+    for (UpdateRequirement requirement : request.requirements()) {
+      if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
+        ++total;
+        assertRefSnapshotID = (UpdateRequirement.AssertRefSnapshotID) 
requirement;
+      }
+    }
+
+    // if > 1 assertion for refs, then it's not safe to roll back, make this 
Noop.
+    return total != 1 ? null : assertRefSnapshotID;
+  }
+
+  private List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot(
+      TableMetadata base,
+      long currentSnapshotId,
+      long expectedCurrentSnapshotId,
+      String updateRefName) {
+    // find the all the snapshots we want to retain which are not the part of 
current branch.
+    Set<Long> idsToRetain = Sets.newHashSet();
+    for (Map.Entry<String, SnapshotRef> ref : base.refs().entrySet()) {
+      String refName = ref.getKey();
+      SnapshotRef snapshotRef = ref.getValue();
+      if (refName.equals(updateRefName)) {
+        continue;
+      }
+      idsToRetain.add(ref.getValue().snapshotId());
+      // check all other branches
+      if (snapshotRef.isBranch()) {
+        for (Snapshot ancestor :
+            SnapshotUtil.ancestorsOf(snapshotRef.snapshotId(), 
base::snapshot)) {
+          idsToRetain.add(ancestor.snapshotId());
+        }
+      } else if (snapshotRef.isTag()) {
+        idsToRetain.add(snapshotRef.snapshotId());

Review Comment:
   Is this not the same as in line 567?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +446,174 @@ protected TableMetadata commit(TableOperations ops, 
UpdateTableRequest request)
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(
-              taskOps -> {
+              (taskOps) -> {
                 TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
-                isRetry.set(true);
 
-                // validate requirements
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
                 try {
-                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
                 } catch (CommitFailedException e) {
-                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry
-                  throw new ValidationFailureException(e);
+                  if (!isRollbackCompactionEnabled()) {
+                    throw new ValidationFailureException(e);
+                  }
+                  UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+                      findAssertRefSnapshotID(request);
+                  MetadataUpdate.SetSnapshotRef setSnapshotRef = 
findSetSnapshotRefUpdate(request);
+
+                  if (assertRefSnapshotId == null || setSnapshotRef == null) {
+                    // This implies the request was not trying to add a 
snapshot.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // snapshot-id the client expects the table 
current_snapshot_id
+                  long expectedCurrentSnapshotId = 
assertRefSnapshotId.snapshotId();
+                  // table current_snapshot_id.
+                  long currentSnapshotId = base.currentSnapshot().snapshotId();
+                  List<MetadataUpdate> metadataUpdates =
+                      generateUpdatesToRemoveNoopSnapshot(
+                          base,
+                          currentSnapshotId,
+                          expectedCurrentSnapshotId,
+                          setSnapshotRef.name());
+
+                  if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+                    // Nothing can be done as this implies that there were not 
all
+                    // No-op snapshots (REPLACE) between 
expectedCurrentSnapshotId and
+                    // currentSnapshotId. hence re-throw the exception caught.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // Set back the ref we wanted to set, back to the snapshot-id
+                  // the client is expecting the table to be at.
+                  metadataBuilder.setBranchSnapshot(
+                      expectedCurrentSnapshotId, setSnapshotRef.name());
+
+                  // apply the remove snapshots update in the current metadata.
+                  // NOTE: we need to setRef to expectedCurrentSnapshotId 
first and then apply
+                  // remove, as otherwise the remove will drop the reference.
+                  // NOTE: we can skip removing the now orphan base. Its not a 
hard requirement.
+                  // just something good to do, and not leave for Remove 
Orphans.
+                  metadataUpdates.forEach((update -> 
update.applyTo(metadataBuilder)));
+                  // Ref rolled back update correctly to snapshot to be 
committed parent now.
+                  newBase = metadataBuilder.build();
+                  // move the lastSequenceNumber back, to apply snapshot 
properly on the
+                  // current-metadata Seq number are considered increasing 
monotonically
+                  // snapshot over snapshot, the client generates the manifest 
list and hence
+                  // the sequence number can't be changed for a snapshot the 
only possible option
+                  // then is to change the sequenceNumber tracked by 
metadata.json
+                  Class<?> clazz = newBase.getClass();
+                  try {
+                    Field field = clazz.getDeclaredField("lastSequenceNumber");
+                    field.setAccessible(true);
+                    // this should point to the sequence number that current 
tip of the
+                    // branch belongs to, as the new commit will be applied on 
top of this.
+                    field.set(newBase, 
newBase.currentSnapshot().sequenceNumber());

Review Comment:
   Is it critical to have contiguous sequence numbers? When we save the new 
data into storage, some sequence number will be reused without proper branches. 
:thinking: 



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +446,174 @@ protected TableMetadata commit(TableOperations ops, 
UpdateTableRequest request)
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(
-              taskOps -> {
+              (taskOps) -> {
                 TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
-                isRetry.set(true);
 
-                // validate requirements
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
                 try {
-                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
                 } catch (CommitFailedException e) {
-                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry
-                  throw new ValidationFailureException(e);
+                  if (!isRollbackCompactionEnabled()) {
+                    throw new ValidationFailureException(e);
+                  }
+                  UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+                      findAssertRefSnapshotID(request);
+                  MetadataUpdate.SetSnapshotRef setSnapshotRef = 
findSetSnapshotRefUpdate(request);
+
+                  if (assertRefSnapshotId == null || setSnapshotRef == null) {
+                    // This implies the request was not trying to add a 
snapshot.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // snapshot-id the client expects the table 
current_snapshot_id
+                  long expectedCurrentSnapshotId = 
assertRefSnapshotId.snapshotId();
+                  // table current_snapshot_id.
+                  long currentSnapshotId = base.currentSnapshot().snapshotId();
+                  List<MetadataUpdate> metadataUpdates =
+                      generateUpdatesToRemoveNoopSnapshot(
+                          base,
+                          currentSnapshotId,
+                          expectedCurrentSnapshotId,
+                          setSnapshotRef.name());
+
+                  if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+                    // Nothing can be done as this implies that there were not 
all
+                    // No-op snapshots (REPLACE) between 
expectedCurrentSnapshotId and
+                    // currentSnapshotId. hence re-throw the exception caught.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // Set back the ref we wanted to set, back to the snapshot-id
+                  // the client is expecting the table to be at.
+                  metadataBuilder.setBranchSnapshot(
+                      expectedCurrentSnapshotId, setSnapshotRef.name());
+
+                  // apply the remove snapshots update in the current metadata.
+                  // NOTE: we need to setRef to expectedCurrentSnapshotId 
first and then apply
+                  // remove, as otherwise the remove will drop the reference.
+                  // NOTE: we can skip removing the now orphan base. Its not a 
hard requirement.
+                  // just something good to do, and not leave for Remove 
Orphans.
+                  metadataUpdates.forEach((update -> 
update.applyTo(metadataBuilder)));
+                  // Ref rolled back update correctly to snapshot to be 
committed parent now.
+                  newBase = metadataBuilder.build();
+                  // move the lastSequenceNumber back, to apply snapshot 
properly on the
+                  // current-metadata Seq number are considered increasing 
monotonically
+                  // snapshot over snapshot, the client generates the manifest 
list and hence
+                  // the sequence number can't be changed for a snapshot the 
only possible option
+                  // then is to change the sequenceNumber tracked by 
metadata.json
+                  Class<?> clazz = newBase.getClass();
+                  try {
+                    Field field = clazz.getDeclaredField("lastSequenceNumber");
+                    field.setAccessible(true);

Review Comment:
   Can we avoid reflection workarounds for inaccessible fields?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to