danielcweeks commented on code in PR #14510:
URL: https://github.com/apache/iceberg/pull/14510#discussion_r2505642047


##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -229,4 +230,45 @@ public void testCoordinatorRunning() {
     sourceConsumer.rebalance(ImmutableList.of(tp1));
     assertThat(mockIcebergSinkTask.isCoordinatorRunning()).isFalse();
   }
+
+  @Test
+  public void testCoordinatorCommittedOffsetValidation() {
+    // This test demonstrates that the Coordinator's validateAndCommit method
+    // prevents commits when another independent commit has updated the offsets
+    // during the commit process
+
+    // Set the initial offsets
+    table
+        .newAppend()
+        .appendFile(EventTestUtil.createDataFile())
+        .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":1}")
+        .commit();
+
+    Table frozenTable = catalog.loadTable(TABLE_IDENTIFIER);
+
+    // return the original table state on the first load, so that the update 
will happen
+    // during the commit refresh
+    
when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(frozenTable).thenCallRealMethod();
+
+    // Independently update the offsets
+    table
+        .newAppend()
+        .appendFile(EventTestUtil.createDataFile())
+        .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}")
+        .commit();
+
+    table.refresh();
+    assertThat(table.snapshots()).hasSize(2);
+    Snapshot firstSnapshot = table.currentSnapshot();
+    assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, 
"{\"0\":7}");
+
+    // Trigger commit to the table
+    coordinatorTest(
+        ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), 
EventTestUtil.now());
+
+    // Assert that the table was not updated and offsets remain
+    table.refresh();
+    assertThat(table.snapshots()).hasSize(2);

Review Comment:
   I'm not sure I fully follow.  We just making sure the number of snapshots is 
what was expected for there not to have been an additional commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to