stevenzwu commented on code in PR #16689:
URL: https://github.com/apache/iceberg/pull/16689#discussion_r3365137715
##########
core/src/main/java/org/apache/iceberg/TrackingBuilder.java:
##########
@@ -143,6 +149,29 @@ Tracking build() {
replacedPositions);
}
+ /** Derives the output status from the source, the snapshot, and any
mutations. */
+ private EntryStatus deriveStatus() {
Review Comment:
The rules `deriveStatus` encodes are non-obvious — the same-commit `ADDED`
carve-out and the same-commit pass-through exist for distinct pipeline reasons
that the parameterized test makes legible only after careful reading. Worth
documenting on the method itself.
Proposed (Javadoc + inline comments at the non-obvious branches):
```java
/**
* Derives the output status from the source entry, the commit snapshot,
and any mutations.
*
* <p>Rules:
* <ul>
* <li>Fresh add (no source) → {@code ADDED}.
* <li>Mutation in this commit → {@code MODIFIED}, except a same-commit
append + same-commit
* mutation (e.g., DV attached to a freshly added file), which stays
{@code ADDED} so a
* single commit is not split into ADDED+MODIFIED entries for the
same file.
* <li>No mutation, same commit as the source → preserve the source
status (avoids demoting a
* still-current {@code ADDED}/{@code MODIFIED} on a no-op pipeline
pass-through).
* <li>No mutation, later commit than the source → {@code EXISTING}
(carry-forward).
* </ul>
*
* <p>{@code DELETED} and {@code REPLACED} are emitted only via the {@code
terminal()} factory,
* never by this method.
*/
private EntryStatus deriveStatus() {
if (source == null) {
return EntryStatus.ADDED;
}
boolean sameSnapshot = source.snapshotId() != null &&
source.snapshotId() == newSnapshotId;
if (mutated) {
// Same-commit append + same-commit mutation (e.g., DV attach in the
same commit) stays ADDED.
if (source.status() == EntryStatus.ADDED && sameSnapshot) {
return EntryStatus.ADDED;
}
return EntryStatus.MODIFIED;
}
// Same commit, no mutation: preserve the source's live status (don't
demote on a no-op stage).
if (sameSnapshot) {
return source.status();
}
return EntryStatus.EXISTING;
}
```
##########
core/src/test/java/org/apache/iceberg/TestTrackingStruct.java:
##########
@@ -433,6 +453,81 @@ void testExistingToTerminalTransitions() {
assertThat(replaced.snapshotId()).isEqualTo(999L);
}
+ private static Stream<Arguments> deriveStatusCases() {
+ long sameSnapshot = 42L;
+ long laterSnapshot = 999L;
+ return Stream.of(
+ // ADDED source
+ Arguments.of(EntryStatus.ADDED, sameSnapshot, false,
EntryStatus.ADDED),
+ Arguments.of(EntryStatus.ADDED, sameSnapshot, true, EntryStatus.ADDED),
+ Arguments.of(EntryStatus.ADDED, laterSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.ADDED, laterSnapshot, true,
EntryStatus.MODIFIED),
+ // EXISTING source
+ Arguments.of(EntryStatus.EXISTING, sameSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.EXISTING, sameSnapshot, true,
EntryStatus.MODIFIED),
+ Arguments.of(EntryStatus.EXISTING, laterSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.EXISTING, laterSnapshot, true,
EntryStatus.MODIFIED),
+ // MODIFIED source
+ Arguments.of(EntryStatus.MODIFIED, sameSnapshot, false,
EntryStatus.MODIFIED),
+ Arguments.of(EntryStatus.MODIFIED, sameSnapshot, true,
EntryStatus.MODIFIED),
+ Arguments.of(EntryStatus.MODIFIED, laterSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.MODIFIED, laterSnapshot, true,
EntryStatus.MODIFIED));
+ }
+
+ @ParameterizedTest
+ @MethodSource("deriveStatusCases")
+ void testDeriveStatus(
+ EntryStatus sourceStatus, long newSnapshotId, boolean mutate,
EntryStatus expected) {
+ Tracking source = sourceTrackingWithStatus(sourceStatus);
+ TrackingBuilder builder = TrackingBuilder.from(source, newSnapshotId);
+ if (mutate) {
+ builder.dvUpdated();
+ }
+
+ assertThat(builder.build().status()).isEqualTo(expected);
+ }
+
+ @Test
+ void testExistingPreservesSourceSnapshotId() {
+ Tracking source = sourceTracking();
+ Tracking existing = TrackingBuilder.from(source, 999L).build();
+ assertThat(existing.status()).isEqualTo(EntryStatus.EXISTING);
+ assertThat(existing.snapshotId()).isEqualTo(source.snapshotId());
+ }
+
+ @Test
+ void testModifiedUsesNewSnapshotId() {
+ Tracking source = sourceTracking();
+ Tracking modified = TrackingBuilder.from(source, 999L).dvUpdated().build();
+ assertThat(modified.status()).isEqualTo(EntryStatus.MODIFIED);
+ assertThat(modified.snapshotId()).isEqualTo(999L);
Review Comment:
nit: we can't see which `snapshotId` the `sourceTracking` method uses. what
if by mistake it uses the same snapshot `999`, then the test coverage is a
moot. We add an assertion like
`assertThat(modified.snapshotId()).isNotEqualTo(source.snapshotId())`.
Similar for line 495 above, we can add an additional assertion that it is
not equal to `999` (the new snapshot id).
Basically, for snapshotId assertion, we always assert it is not equal to the
other value.
##########
core/src/test/java/org/apache/iceberg/TestTrackingStruct.java:
##########
@@ -433,6 +453,81 @@ void testExistingToTerminalTransitions() {
assertThat(replaced.snapshotId()).isEqualTo(999L);
}
+ private static Stream<Arguments> deriveStatusCases() {
+ long sameSnapshot = 42L;
+ long laterSnapshot = 999L;
+ return Stream.of(
+ // ADDED source
+ Arguments.of(EntryStatus.ADDED, sameSnapshot, false,
EntryStatus.ADDED),
+ Arguments.of(EntryStatus.ADDED, sameSnapshot, true, EntryStatus.ADDED),
+ Arguments.of(EntryStatus.ADDED, laterSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.ADDED, laterSnapshot, true,
EntryStatus.MODIFIED),
+ // EXISTING source
+ Arguments.of(EntryStatus.EXISTING, sameSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.EXISTING, sameSnapshot, true,
EntryStatus.MODIFIED),
Review Comment:
`EXISTING` entries carry the *original add* snapshot's id, so
`source.snapshotId() == newSnapshotId` (sameSnapshot) is contradictory in
production — a writer wouldn't emit an `EXISTING` entry tagged with the current
commit's snapshot id; if it really were added in this commit, the correct
status is `ADDED`.
Both `EXISTING + sameSnapshot` rows here (line 466 and 467) encode behavior
on inputs that no real path produces. Either drop them so every row maps to a
realistic scenario, or leave a one-line comment noting they're for rule-closure
rather than reachable cases. Worth disambiguating since the rest of the table
is genuinely scenario-driven.
##########
core/src/test/java/org/apache/iceberg/TestTrackingStruct.java:
##########
@@ -433,6 +453,81 @@ void testExistingToTerminalTransitions() {
assertThat(replaced.snapshotId()).isEqualTo(999L);
}
+ private static Stream<Arguments> deriveStatusCases() {
+ long sameSnapshot = 42L;
+ long laterSnapshot = 999L;
+ return Stream.of(
+ // ADDED source
+ Arguments.of(EntryStatus.ADDED, sameSnapshot, false,
EntryStatus.ADDED),
+ Arguments.of(EntryStatus.ADDED, sameSnapshot, true, EntryStatus.ADDED),
+ Arguments.of(EntryStatus.ADDED, laterSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.ADDED, laterSnapshot, true,
EntryStatus.MODIFIED),
+ // EXISTING source
+ Arguments.of(EntryStatus.EXISTING, sameSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.EXISTING, sameSnapshot, true,
EntryStatus.MODIFIED),
+ Arguments.of(EntryStatus.EXISTING, laterSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.EXISTING, laterSnapshot, true,
EntryStatus.MODIFIED),
+ // MODIFIED source
+ Arguments.of(EntryStatus.MODIFIED, sameSnapshot, false,
EntryStatus.MODIFIED),
+ Arguments.of(EntryStatus.MODIFIED, sameSnapshot, true,
EntryStatus.MODIFIED),
+ Arguments.of(EntryStatus.MODIFIED, laterSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.MODIFIED, laterSnapshot, true,
EntryStatus.MODIFIED));
+ }
+
+ @ParameterizedTest
+ @MethodSource("deriveStatusCases")
+ void testDeriveStatus(
+ EntryStatus sourceStatus, long newSnapshotId, boolean mutate,
EntryStatus expected) {
+ Tracking source = sourceTrackingWithStatus(sourceStatus);
+ TrackingBuilder builder = TrackingBuilder.from(source, newSnapshotId);
+ if (mutate) {
+ builder.dvUpdated();
+ }
+
+ assertThat(builder.build().status()).isEqualTo(expected);
+ }
+
+ @Test
+ void testExistingPreservesSourceSnapshotId() {
+ Tracking source = sourceTracking();
+ Tracking existing = TrackingBuilder.from(source, 999L).build();
+ assertThat(existing.status()).isEqualTo(EntryStatus.EXISTING);
+ assertThat(existing.snapshotId()).isEqualTo(source.snapshotId());
+ }
+
+ @Test
+ void testModifiedUsesNewSnapshotId() {
+ Tracking source = sourceTracking();
+ Tracking modified = TrackingBuilder.from(source, 999L).dvUpdated().build();
+ assertThat(modified.status()).isEqualTo(EntryStatus.MODIFIED);
+ assertThat(modified.snapshotId()).isEqualTo(999L);
+ }
+
+ @Test
+ void testManifestDVPositionsProduceModified() {
+ ByteBuffer deletedBytes = ByteBuffer.wrap(new byte[] {1, 2});
+ ByteBuffer replacedBytes = ByteBuffer.wrap(new byte[] {3, 4});
+
+ // cross-commit: ADDED source carried into a new snapshot with MDV
positions
+ Tracking addedSource = manifestSourceTracking();
+ Tracking crossCommit =
+ TrackingBuilder.from(addedSource,
999L).deletedPositions(deletedBytes).build();
+ assertThat(crossCommit.status()).isEqualTo(EntryStatus.MODIFIED);
+ assertThat(crossCommit.snapshotId()).isEqualTo(999L);
+ assertThat(crossCommit.deletedPositions()).isEqualTo(deletedBytes);
+
+ // same-commit: EXISTING source mutated within its own snapshot
Review Comment:
is this really a valid scenario in practice? I mentioned this in the other
comment for the test parameters.
##########
core/src/test/java/org/apache/iceberg/TestTrackingStruct.java:
##########
@@ -433,6 +453,81 @@ void testExistingToTerminalTransitions() {
assertThat(replaced.snapshotId()).isEqualTo(999L);
}
+ private static Stream<Arguments> deriveStatusCases() {
+ long sameSnapshot = 42L;
+ long laterSnapshot = 999L;
+ return Stream.of(
+ // ADDED source
+ Arguments.of(EntryStatus.ADDED, sameSnapshot, false,
EntryStatus.ADDED),
+ Arguments.of(EntryStatus.ADDED, sameSnapshot, true, EntryStatus.ADDED),
+ Arguments.of(EntryStatus.ADDED, laterSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.ADDED, laterSnapshot, true,
EntryStatus.MODIFIED),
+ // EXISTING source
+ Arguments.of(EntryStatus.EXISTING, sameSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.EXISTING, sameSnapshot, true,
EntryStatus.MODIFIED),
+ Arguments.of(EntryStatus.EXISTING, laterSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.EXISTING, laterSnapshot, true,
EntryStatus.MODIFIED),
+ // MODIFIED source
+ Arguments.of(EntryStatus.MODIFIED, sameSnapshot, false,
EntryStatus.MODIFIED),
+ Arguments.of(EntryStatus.MODIFIED, sameSnapshot, true,
EntryStatus.MODIFIED),
+ Arguments.of(EntryStatus.MODIFIED, laterSnapshot, false,
EntryStatus.EXISTING),
+ Arguments.of(EntryStatus.MODIFIED, laterSnapshot, true,
EntryStatus.MODIFIED));
+ }
+
+ @ParameterizedTest
+ @MethodSource("deriveStatusCases")
+ void testDeriveStatus(
+ EntryStatus sourceStatus, long newSnapshotId, boolean mutate,
EntryStatus expected) {
+ Tracking source = sourceTrackingWithStatus(sourceStatus);
+ TrackingBuilder builder = TrackingBuilder.from(source, newSnapshotId);
+ if (mutate) {
+ builder.dvUpdated();
Review Comment:
this only test dvUpdated mutation, not the `deletedPositions` and
`replacedPositions`. but for status transition test, this is probably ok.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]