laskoviymishka commented on code in PR #16237:
URL: https://github.com/apache/iceberg/pull/16237#discussion_r3255032242
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -289,13 +313,14 @@ public void testCoordinatorCommittedOffsetValidation() {
Snapshot firstSnapshot = table.currentSnapshot();
assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP,
"{\"0\":7}");
- // Trigger commit to the table
- coordinatorTest(
- ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(),
EventTestUtil.now());
-
- // Assert that the table was not updated and offsets remain
- table.refresh();
- assertThat(table.snapshots()).hasSize(2);
- assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP,
"{\"0\":7}");
+ // Trigger commit to the table - should throw ValidationException
+ assertThatThrownBy(
+ () ->
+ coordinatorTest(
+ ImmutableList.of(EventTestUtil.createDataFile()),
+ ImmutableList.of(),
+ EventTestUtil.now()))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("stale offsets");
Review Comment:
Same shape as the comment above, stale-offset conflict the table was *not*
mutated (`snapshots().hasSize(2)` and offset still `{"0":7}`).
Those are the actual correctness guarantees of the optimistic-concurrency
guard, and they're no longer checked. The exception-type assertion alone would
pass even if a future regression committed the row delta before throwing.
Lets keep the post-throw assertions:
```java
assertThatThrownBy(...)
.isInstanceOf(ValidationException.class)
.hasMessageContaining("stale offsets");
table.refresh();
assertThat(table.snapshots()).hasSize(2);
assertThat(table.currentSnapshot().summary())
.containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}");
```
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -135,12 +141,30 @@ public void testCommitError() {
.withRecordCount(5)
.build();
- coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
-
- // no commit messages sent
- assertThat(producer.history()).hasSize(1);
+ assertThatThrownBy(
+ () -> coordinatorTest(ImmutableList.of(badDataFile),
ImmutableList.of(), null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot find partition spec");
Review Comment:
the prev-prev version of this test also asserted
`producer.history().hasSize(1)` (only the `StartCommit` event was sent, no
`CommitToTable`) and `table.snapshots().isEmpty()` (no phantom commit landed).
v4 drops both.
The test now only proves an exception is thrown, it would still pass if a
future regression sent a `CommitToTable` event before failing, or somehow
committed-then-threw.
Maybe keep side-effect guards:
```java
assertThatThrownBy(...)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find partition spec");
assertThat(producer.history()).hasSize(1);
assertThat(table.snapshots()).isEmpty();
```
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -150,12 +150,22 @@ protected boolean receive(Envelope envelope) {
private void commit(boolean partialCommit) {
try {
doCommit(partialCommit);
- } catch (Exception e) {
- LOG.warn(
- "Coordinator {} failed to commit for commit {}, will try again next
cycle",
- taskId,
- commitState.currentCommitId(),
- e);
+ } catch (RuntimeException e) {
Review Comment:
Two small things on this catch block:
**SLF4J double-printing the message.** Both LOG calls pass `e.getMessage()`
as a `{}` arg *and* `e` as the trailing throwable. SLF4J already prints the
exception's message (and full stack) when the last arg is a `Throwable`, so the
message ends up rendered twice in every log line.
**`taskId` asymmetry.** It's in the ERROR but dropped from the WARN. In a
multi-task cluster you'd lose task identity for retried partial commits.
Both fixed at once:
```java
LOG.warn("Partial commit {} failed for task {}, will retry",
commitState.currentCommitId(), taskId, e);
...
LOG.error("Commit {} failed for task {}",
commitState.currentCommitId(), taskId, e);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]