laskoviymishka commented on code in PR #16237:
URL: https://github.com/apache/iceberg/pull/16237#discussion_r3253532652
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -150,12 +153,28 @@ protected boolean receive(Envelope envelope) {
private void commit(boolean partialCommit) {
try {
doCommit(partialCommit);
- } catch (Exception e) {
+ } catch (CommitFailedException | CommitStateUnknownException e) {
LOG.warn(
- "Coordinator {} failed to commit for commit {}, will try again next
cycle",
- taskId,
+ "Commit {} failed, will retry on next cycle: {}",
commitState.currentCommitId(),
+ e.getMessage(),
e);
+ } catch (RuntimeException e) {
+ if (e instanceof CleanableFailure) {
Review Comment:
`CleanableFailure` doesn’t mean “safe to retry.”
Its only uncommitted metadata can be cleaned up.
This branch also catches things that are clearly not retryable:
* `ForbiddenException` / 403
* `BadRequestException` / 400
* `NotAuthorizedException` / 401
* `ValidationException`
So a bad credential could keep retrying forever at WARN, with no clear
signal to the operator.
If we want retry behavior here, I’d make it explicit: list the retryable
exception types instead of using `CleanableFailure` as the check.
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -150,12 +153,28 @@ protected boolean receive(Envelope envelope) {
private void commit(boolean partialCommit) {
try {
doCommit(partialCommit);
- } catch (Exception e) {
+ } catch (CommitFailedException | CommitStateUnknownException e) {
Review Comment:
There are two different issues here, and both look bad.
`CommitStateUnknownException` should not be retried blindly. Its own Javadoc
says we don’t know whether the commit succeeded, and retrying can create
duplicates. Since the files stay in `commitBuffer`, the next cycle can commit
the same files again. If the first commit actually landed, we get duplicate
rows. This should be fatal and require manual table-state check.
`CommitFailedException` is also not something we should swallow. That is
exactly the bug from #15878. A WARN + “retry next cycle” is not enough here:
Kafka offsets may already be flushed, and on rebalance the in-memory commit
state can be lost. Then the data is gone and the operator never sees a real
failure.
Flink lets `CommitFailedException` propagate. I think we should do the same
here.
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -135,14 +141,50 @@ public void testCommitError() {
.withRecordCount(5)
.build();
- coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
+ assertThatThrownBy(
Review Comment:
`coordinatorTest()` calls `coordinator.process()` directly, so it skips the
real production path.
In prod the flow is:
`CoordinatorThread.run()` catches the exception → marks `terminated = true`
→ next `CommitterImpl.save()` calls `processControlEvents()` → throws
`NotRunningException`.
This test doesn’t cover that. It would still pass even if
`CoordinatorThread` swallowed the failure.
I think we need an end-to-end test that goes through `CoordinatorThread` +
`CommitterImpl`.
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -135,14 +141,50 @@ public void testCommitError() {
.withRecordCount(5)
.build();
- coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
+ assertThatThrownBy(
+ () -> coordinatorTest(ImmutableList.of(badDataFile),
ImmutableList.of(), null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot find partition spec");
// no commit messages sent
assertThat(producer.history()).hasSize(1);
assertThat(table.snapshots()).isEmpty();
}
+ @Test
+ public void testCommitFailedExceptionSwallowed() {
Review Comment:
The comment says *“Verify issue #15878”*, but the test still expects
`CommitFailedException` to be ignored.
That was the bug in #15878: the commit failure was swallowed and data was
lost.
To verify the fix, the test should assert that the next
`CommitterImpl.save()` throws `NotRunningException`, meaning the task moved to
`FAILED` and the operator can see the failure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]