Baunsgaard commented on code in PR #16237:
URL: https://github.com/apache/iceberg/pull/16237#discussion_r3208678316


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -148,14 +148,18 @@ protected boolean receive(Envelope envelope) {
   }
 
   private void commit(boolean partialCommit) {
+    // Do not swallow commit failures: propagate them so CoordinatorThread 
terminates
+    // and the Kafka Connect task transitions to FAILED instead of silently 
dropping data
+    // (e.g., CommitFailedException from catalogs that detect concurrent 
updates).
     try {
       doCommit(partialCommit);
-    } catch (Exception e) {
-      LOG.warn(
-          "Coordinator {} failed to commit for commit {}, will try again next 
cycle",
+    } catch (RuntimeException e) {
+      LOG.error(
+          "Coordinator {} failed to commit for commit {}; propagating failure 
to terminate task",
           taskId,
           commitState.currentCommitId(),
           e);
+      throw e;

Review Comment:
   change it to 
   
   ```
       throw new RuntimeException(
             String.format("Coordinator %s failed to commit %s",
                 taskId, commitState.currentCommitId()),
             e);
   ```
   
   This allows the further up `CoordinatorThread.run()` catch to log the error 
once, and still attribute the error to this location.



##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -135,14 +141,40 @@ public void testCommitError() {
             .withRecordCount(5)
             .build();
 
-    coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
+    assertThatThrownBy(
+            () -> coordinatorTest(ImmutableList.of(badDataFile), 
ImmutableList.of(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot find partition spec");
 
     // no commit messages sent
     assertThat(producer.history()).hasSize(1);
 
     assertThat(table.snapshots()).isEmpty();
   }
 
+  @Test
+  public void testCommitFailedExceptionPropagates() {
+    // Reproduce issue #15878: a CommitFailedException from the catalog (e.g., 
Glue concurrent
+    // update) must propagate out of Coordinator.process() so that 
CoordinatorThread terminates
+    // and the Kafka Connect task transitions to FAILED instead of silently 
dropping data.
+    Table spiedTable = spy(table);
+    AppendFiles spiedAppend = spy(table.newAppend());
+    doThrow(new CommitFailedException("Glue detected concurrent update"))
+        .when(spiedAppend)
+        .commit();
+    when(spiedTable.newAppend()).thenReturn(spiedAppend);
+    when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);
+
+    assertThatThrownBy(
+            () ->
+                coordinatorTest(
+                    ImmutableList.of(EventTestUtil.createDataFile()),
+                    ImmutableList.of(),
+                    EventTestUtil.now()))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining("Glue detected concurrent update");

Review Comment:
   if you do the above change, then i think this need to be :
   
   ```
           .hasRootCauseMessage("Glue detected concurrent update");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to