rdblue commented on code in PR #14510:
URL: https://github.com/apache/iceberg/pull/14510#discussion_r2505793160
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -253,4 +254,45 @@ public void testCoordinatorCommittedOffsetMerging() {
assertThat(table.currentSnapshot().summary())
.containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3,\"1\":7}");
}
+
+ @Test
+ public void testCoordinatorCommittedOffsetValidation() {
+ // This test demonstrates that the Coordinator's validateAndCommit method
+ // prevents commits when another independent commit has updated the offsets
+ // during the commit process
+
+ // Set the initial offsets
+ table
+ .newAppend()
+ .appendFile(EventTestUtil.createDataFile())
+ .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":1}")
+ .commit();
+
+ Table frozenTable = catalog.loadTable(TABLE_IDENTIFIER);
+
+ // return the original table state on the first load, so that the update
will happen
+ // during the commit refresh
+
when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(frozenTable).thenCallRealMethod();
+
+ // Independently update the offsets
+ table
+ .newAppend()
+ .appendFile(EventTestUtil.createDataFile())
+ .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}")
+ .commit();
+
+ table.refresh();
Review Comment:
This isn't needed after the append right? It's the same table instance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]