Baunsgaard commented on code in PR #16237:
URL: https://github.com/apache/iceberg/pull/16237#discussion_r3208678316
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -148,14 +148,18 @@ protected boolean receive(Envelope envelope) {
}
private void commit(boolean partialCommit) {
+ // Do not swallow commit failures: propagate them so CoordinatorThread
terminates
+ // and the Kafka Connect task transitions to FAILED instead of silently
dropping data
+ // (e.g., CommitFailedException from catalogs that detect concurrent
updates).
try {
doCommit(partialCommit);
- } catch (Exception e) {
- LOG.warn(
- "Coordinator {} failed to commit for commit {}, will try again next
cycle",
+ } catch (RuntimeException e) {
+ LOG.error(
+ "Coordinator {} failed to commit for commit {}; propagating failure
to terminate task",
taskId,
commitState.currentCommitId(),
e);
+ throw e;
Review Comment:
change it to
```
throw new RuntimeException(
String.format("Coordinator %s failed to commit %s",
taskId, commitState.currentCommitId()),
e);
```
This allows the further up `CoordinatorThread.run()` catch to log the error
once, and still attribute the error to this location.
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -135,14 +141,40 @@ public void testCommitError() {
.withRecordCount(5)
.build();
- coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
+ assertThatThrownBy(
+ () -> coordinatorTest(ImmutableList.of(badDataFile),
ImmutableList.of(), null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot find partition spec");
// no commit messages sent
assertThat(producer.history()).hasSize(1);
assertThat(table.snapshots()).isEmpty();
}
+ @Test
+ public void testCommitFailedExceptionPropagates() {
+ // Reproduce issue #15878: a CommitFailedException from the catalog (e.g.,
Glue concurrent
+ // update) must propagate out of Coordinator.process() so that
CoordinatorThread terminates
+ // and the Kafka Connect task transitions to FAILED instead of silently
dropping data.
+ Table spiedTable = spy(table);
+ AppendFiles spiedAppend = spy(table.newAppend());
+ doThrow(new CommitFailedException("Glue detected concurrent update"))
+ .when(spiedAppend)
+ .commit();
+ when(spiedTable.newAppend()).thenReturn(spiedAppend);
+ when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);
+
+ assertThatThrownBy(
+ () ->
+ coordinatorTest(
+ ImmutableList.of(EventTestUtil.createDataFile()),
+ ImmutableList.of(),
+ EventTestUtil.now()))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("Glue detected concurrent update");
Review Comment:
if you do the above change, then i think this need to be :
```
.hasRootCauseMessage("Glue detected concurrent update");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]