laskoviymishka commented on code in PR #16237:
URL: https://github.com/apache/iceberg/pull/16237#discussion_r3253532652


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -150,12 +153,28 @@ protected boolean receive(Envelope envelope) {
   private void commit(boolean partialCommit) {
     try {
       doCommit(partialCommit);
-    } catch (Exception e) {
+    } catch (CommitFailedException | CommitStateUnknownException e) {
       LOG.warn(
-          "Coordinator {} failed to commit for commit {}, will try again next 
cycle",
-          taskId,
+          "Commit {} failed, will retry on next cycle: {}",
           commitState.currentCommitId(),
+          e.getMessage(),
           e);
+    } catch (RuntimeException e) {
+      if (e instanceof CleanableFailure) {

Review Comment:
   `CleanableFailure` doesn’t mean “safe to retry.” 
   
   Its only uncommitted metadata can be cleaned up.
   
   This branch also catches things that are clearly not retryable:
   
   * `ForbiddenException` / 403
   * `BadRequestException` / 400
   * `NotAuthorizedException` / 401
   * `ValidationException`
   
   So a bad credential could keep retrying forever at WARN, with no clear 
signal to the operator.
   
   If we want retry behavior here, I’d make it explicit: list the retryable 
exception types instead of using `CleanableFailure` as the check.
   



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -150,12 +153,28 @@ protected boolean receive(Envelope envelope) {
   private void commit(boolean partialCommit) {
     try {
       doCommit(partialCommit);
-    } catch (Exception e) {
+    } catch (CommitFailedException | CommitStateUnknownException e) {

Review Comment:
   There are two different issues here, and both look bad.
   
   `CommitStateUnknownException` should not be retried blindly. Its own Javadoc 
says we don’t know whether the commit succeeded, and retrying can create 
duplicates. Since the files stay in `commitBuffer`, the next cycle can commit 
the same files again. If the first commit actually landed, we get duplicate 
rows. This should be fatal and require manual table-state check.
   
   `CommitFailedException` is also not something we should swallow. That is 
exactly the bug from #15878. A WARN + “retry next cycle” is not enough here: 
Kafka offsets may already be flushed, and on rebalance the in-memory commit 
state can be lost. Then the data is gone and the operator never sees a real 
failure.
   
   Flink lets `CommitFailedException` propagate. I think we should do the same 
here.
   



##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -135,14 +141,50 @@ public void testCommitError() {
             .withRecordCount(5)
             .build();
 
-    coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
+    assertThatThrownBy(

Review Comment:
   `coordinatorTest()` calls `coordinator.process()` directly, so it skips the 
real production path.
   
   In prod the flow is:
   
   `CoordinatorThread.run()` catches the exception → marks `terminated = true` 
→ next `CommitterImpl.save()` calls `processControlEvents()` → throws 
`NotRunningException`.
   
   This test doesn’t cover that. It would still pass even if 
`CoordinatorThread` swallowed the failure.
   
   I think we need an end-to-end test that goes through `CoordinatorThread` + 
`CommitterImpl`.
   



##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -135,14 +141,50 @@ public void testCommitError() {
             .withRecordCount(5)
             .build();
 
-    coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
+    assertThatThrownBy(
+            () -> coordinatorTest(ImmutableList.of(badDataFile), 
ImmutableList.of(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot find partition spec");
 
     // no commit messages sent
     assertThat(producer.history()).hasSize(1);
 
     assertThat(table.snapshots()).isEmpty();
   }
 
+  @Test
+  public void testCommitFailedExceptionSwallowed() {

Review Comment:
   The comment says *“Verify issue #15878”*, but the test still expects 
`CommitFailedException` to be ignored.
   
   That was the bug in #15878: the commit failure was swallowed and data was 
lost.
   
   To verify the fix, the test should assert that the next 
`CommitterImpl.save()` throws `NotRunningException`, meaning the task moved to 
`FAILED` and the operator can see the failure.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to