danielcweeks commented on code in PR #14506:
URL: https://github.com/apache/iceberg/pull/14506#discussion_r2493032054


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -284,6 +289,29 @@ private void commitToTable(
     }
   }
 
+  private void validateAndCommit(
+      PendingUpdate<?> pendingUpdate, String branch, Map<Integer, Long> 
expectedOffsets) {
+    CommitValidator validator =
+        (base, metadata) -> {
+          Map<Integer, Long> lastCommittedOffsets = 
lastCommittedOffsetsForTable(base, branch);
+
+          if (expectedOffsets == null || expectedOffsets.isEmpty()) {
+            return; // there are no stored offsets, so assume we're starting 
with new offsets
+          }
+
+          if (!expectedOffsets.equals(lastCommittedOffsets)) {
+            throw new CommitFailedException(
+                "Latest offsets do not match expected offsets for this 
commit.");
+          }
+        };

Review Comment:
   I debated this, but I'm not sure what the expected behavior should be in 
that case.  It seems likely that we want to error in that case as well, but the 
scenario around it is less clear to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to