laskoviymishka commented on code in PR #16434:
URL: https://github.com/apache/iceberg/pull/16434#discussion_r3288022437


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java:
##########
@@ -105,6 +105,10 @@ public class IcebergSinkConfig extends AbstractConfig {
   private static final String COORDINATOR_EXECUTOR_KEEP_ALIVE_TIMEOUT_MS =
       "iceberg.coordinator-executor-keep-alive-timeout-ms";
 
+  private static final String COMMIT_MAX_CONSECUTIVE_FAILURES_PROP =
+      "iceberg.connect.commit.max-consecutive-failures";
+  private static final int COMMIT_MAX_CONSECUTIVE_FAILURES_DEFAULT = 3;

Review Comment:
   Not a blocker, but worth being explicit: defaulting this to `3` is a 
behavioral breaking change. Pre-PR the coordinator terminates on the first 
commit exception; after upgrade, every existing deployment silently tolerates 
two retries (~15 min hidden-failure window at a 5-minute commit interval). The 
three existing tests in this PR that had to be pinned to `=1` to preserve their 
assertions are direct evidence the default is observable behavior, not just a 
tuning knob.
   
   If we keep `3`, I'd at minimum call it out explicitly in the config 
description and in the release notes / upgrade docs so operators know to set 
`=1` if they want to preserve fail-fast. Treating retry as opt-in (default `1`) 
would be cleaner, but I won't block on it. wdyt?



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -158,8 +160,22 @@ private void commit(boolean partialCommit) {
             taskId,
             e);
       } else {
-        LOG.error("Commit {} failed for task {}", 
commitState.currentCommitId(), taskId, e);
-        throw e;
+        consecutiveCommitFailures++;

Review Comment:
   I'd narrow this catch. `catch (RuntimeException e)` sweeps in two classes of 
exception we really don't want to retry:
   
   `CommitStateUnknownException` — its javadoc is explicit that retrying may 
result in duplicate records or unintentional modifications, and 
`SnapshotProducer` itself re-throws it rather than retrying. We have no 
idempotency check before the next attempt, so we'd be the ones producing the 
duplicates.
   
   Permanent failures like `ValidationException`, `ForbiddenException`, 
`NotAuthorizedException`, `BadRequestException` are also `RuntimeException`. 
With a 5-minute commit interval and default=3 they'd burn ~15 minutes silently 
before terminating — the modified `testCoordinatorCommittedOffsetValidation` 
(now pinned to `=1` to keep its assertion) is a tell that retrying validation 
failures isn't desired here either.
   
   I'd either catch `CommitFailedException` specifically, or keep the wider 
catch and short-circuit the non-retryable types:
   
   ```java
   } catch (RuntimeException e) {
     if (e instanceof CommitStateUnknownException) {
       throw e;
     }
     // ... existing partialCommit / retry logic, ideally gated on (e 
instanceof CommitFailedException)
   }
   ```
   
   wdyt?



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -150,6 +151,7 @@ protected boolean receive(Envelope envelope) {
   private void commit(boolean partialCommit) {
     try {
       doCommit(partialCommit);
+      consecutiveCommitFailures = 0;

Review Comment:
   I think there's a subtle asymmetry here: a partial-commit failure doesn't 
increment the counter (the `if (partialCommit)` branch above just logs), but a 
partial-commit success resets it. So a pattern of full-commit failures 
interleaved with successful partial commits will keep renewing the retry budget 
indefinitely, and the counter never trips.
   
   I'd gate the reset on `!partialCommit` so the counter only tracks the 
codepath that increments it:
   
   ```java
   doCommit(partialCommit);
   if (!partialCommit) {
     consecutiveCommitFailures = 0;
   }
   ```
   
   wdyt?



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java:
##########
@@ -235,6 +239,12 @@ private static ConfigDef newConfigDef() {
         120000L,
         Importance.LOW,
         "config to control coordinator executor keep alive time");
+    configDef.define(
+        COMMIT_MAX_CONSECUTIVE_FAILURES_PROP,
+        ConfigDef.Type.INT,
+        COMMIT_MAX_CONSECUTIVE_FAILURES_DEFAULT,
+        Importance.MEDIUM,
+        "Maximum number of consecutive commit failures before the coordinator 
terminates");

Review Comment:
   Two small things I'd fold in while we're here:
   
   The property name `iceberg.connect.commit.max-consecutive-failures` 
introduces a new `iceberg.connect.commit.*` namespace — the rest of the commit 
knobs in this file live under a different prefix. Once this ships it's public 
and renaming costs a deprecation cycle. Worth aligning with the existing 
commit-prefix convention now.
   
   And I'd add `ConfigDef.Range.atLeast(1)` as the validator argument — `0` or 
negative silently makes the feature unreachable.



##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -170,6 +174,66 @@ public void testCommitFailedExceptionPropagates() {
         .hasMessageContaining("Glue detected concurrent update");
   }
 
+  @Test
+  public void testCommitBoundedRetry() {
+    when(config.commitMaxConsecutiveFailures()).thenReturn(3);
+    when(config.commitIntervalMs()).thenReturn(0);
+    when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);
+
+    Table spiedTable = spy(table);
+    AppendFiles spiedAppend = spy(table.newAppend());
+    doThrow(new CommitFailedException("transient 
error")).when(spiedAppend).commit();
+    when(spiedTable.newAppend()).thenReturn(spiedAppend);
+    when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);
+
+    SinkTaskContext context = mock(SinkTaskContext.class);
+    Coordinator coordinator =
+        new Coordinator(catalog, config, ImmutableList.of(), clientFactory, 
context);
+    coordinator.start();
+    initConsumer();
+
+    // first two failures should not throw
+    triggerCommitCycle(coordinator);
+    triggerCommitCycle(coordinator);
+
+    // third consecutive failure should terminate
+    assertThatThrownBy(() -> triggerCommitCycle(coordinator))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining("transient error");

Review Comment:
   `testCommitBoundedRetry` only covers fail-fail-terminate. The 
reset-on-success path (fail, fail, succeed, fail, fail — must not terminate) is 
the half of the contract that a future refactor could silently break, and it's 
not exercised anywhere. A second case that interleaves a successful cycle 
between failures would lock that in.
   
   Separately — once the classifier above lands, a one-liner asserting 
`CommitStateUnknownException` propagates on the first failure (rather than 
being retried) would pin the most important new invariant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to