amogh-jahagirdar commented on code in PR #11948:
URL: https://github.com/apache/iceberg/pull/11948#discussion_r1928099165


##########
core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRowLineageMetadata {
+
+  @Parameters(name = "formatVersion = {0}")
+  private static List<Integer> formatVersion() {
+    return Ints.asList(TestHelpers.ALL_VERSIONS);
+  }
+
+  @Parameter private int formatVersion;
+
+  private static final String TEST_LOCATION = "s3://bucket/test/location";
+
+  private static final Schema TEST_SCHEMA =
+      new Schema(
+          7,
+          Types.NestedField.required(1, "x", Types.LongType.get()),
+          Types.NestedField.required(2, "y", Types.LongType.get(), "comment"),
+          Types.NestedField.required(3, "z", Types.LongType.get()));
+
+  private TableMetadata baseMetadata() {
+    return TableMetadata.buildFromEmpty(formatVersion)
+        .enableRowLineage()
+        .addSchema(TEST_SCHEMA)
+        .setLocation(TEST_LOCATION)
+        .addPartitionSpec(PartitionSpec.unpartitioned())
+        .addSortOrder(SortOrder.unsorted())
+        .build();
+  }
+
+  @TempDir private File tableDir = null;
+
+  @AfterEach
+  public void cleanup() {
+    TestTables.clearTables();
+  }
+
+  @TestTemplate
+  public void testRowLineageSupported() {
+    if (formatVersion >= TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE) {
+      
assertThat(TableMetadata.buildFromEmpty(formatVersion).enableRowLineage()).isNotNull();
+    } else {
+      assertThatThrownBy(() -> 
TableMetadata.buildFromEmpty(formatVersion).enableRowLineage())
+          .isInstanceOf(IllegalArgumentException.class)
+          .hasMessageContaining("Cannot use row lineage");
+    }
+  }
+
+  @TestTemplate
+  public void testSnapshotAddition() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    long newRows = 30L;
+
+    TableMetadata base = baseMetadata();
+
+    Snapshot addRows =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId(), newRows);
+
+    TableMetadata firstAddition = 
TableMetadata.buildFrom(base).addSnapshot(addRows).build();
+
+    assertThat(firstAddition.nextRowId()).isEqualTo(newRows);
+
+    Snapshot addMoreRows =
+        new BaseSnapshot(
+            1, 2, 1L, 0, DataOperations.APPEND, null, 1, "foo", 
firstAddition.nextRowId(), newRows);
+
+    TableMetadata secondAddition =
+        
TableMetadata.buildFrom(firstAddition).addSnapshot(addMoreRows).build();
+
+    assertThat(secondAddition.nextRowId()).isEqualTo(newRows * 2);
+  }
+
+  @TestTemplate
+  public void testInvalidSnapshotAddition() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    Long newRows = 30L;
+
+    TableMetadata base = baseMetadata();
+
+    Snapshot invalidLastRow =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId() - 3, newRows);
+
+    assertThatThrownBy(() -> 
TableMetadata.buildFrom(base).addSnapshot(invalidLastRow))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Cannot add a snapshot whose 'first-row-id'");
+
+    Snapshot invalidNewRows =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId(), null);
+
+    assertThatThrownBy(() -> 
TableMetadata.buildFrom(base).addSnapshot(invalidNewRows))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining(
+            "Cannot add a snapshot with a null 'added-rows' field when row 
lineage is enabled");
+  }
+
+  @TestTemplate
+  public void testFastAppend() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+    TableMetadata base = table.ops().current();
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    table.newFastAppend().appendFile(fileWithRows(30)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30);
+
+    
table.newFastAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11);
+  }
+
+  @TestTemplate
+  public void testAppend() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+    TableMetadata base = table.ops().current();
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    table.newAppend().appendFile(fileWithRows(30)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30);
+
+    
table.newAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11);
+  }
+
+  @TestTemplate
+  public void testAppendBranch() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+    // Appends to a branch should still change last-row-id even if not on 
main, these changes
+    // should also affect commits to main
+
+    String branch = "some_branch";
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+
+    TableMetadata base = table.ops().current();
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    // Write to Branch
+    table.newAppend().appendFile(fileWithRows(30)).toBranch(branch).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot()).isNull();
+    assertThat(table.snapshot(branch).firstRowId()).isEqualTo(0L);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30);
+
+    // Write to Main
+    
table.newAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();

Review Comment:
   Same as above, don't think this is neccessary? 



##########
core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRowLineageMetadata {
+
+  @Parameters(name = "formatVersion = {0}")
+  private static List<Integer> formatVersion() {
+    return Ints.asList(TestHelpers.ALL_VERSIONS);
+  }
+
+  @Parameter private int formatVersion;
+
+  private static final String TEST_LOCATION = "s3://bucket/test/location";
+
+  private static final Schema TEST_SCHEMA =
+      new Schema(
+          7,
+          Types.NestedField.required(1, "x", Types.LongType.get()),
+          Types.NestedField.required(2, "y", Types.LongType.get(), "comment"),
+          Types.NestedField.required(3, "z", Types.LongType.get()));
+
+  private TableMetadata baseMetadata() {
+    return TableMetadata.buildFromEmpty(formatVersion)
+        .enableRowLineage()
+        .addSchema(TEST_SCHEMA)
+        .setLocation(TEST_LOCATION)
+        .addPartitionSpec(PartitionSpec.unpartitioned())
+        .addSortOrder(SortOrder.unsorted())
+        .build();
+  }
+
+  @TempDir private File tableDir = null;
+
+  @AfterEach
+  public void cleanup() {
+    TestTables.clearTables();
+  }
+
+  @TestTemplate
+  public void testRowLineageSupported() {
+    if (formatVersion >= TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE) {
+      
assertThat(TableMetadata.buildFromEmpty(formatVersion).enableRowLineage()).isNotNull();
+    } else {
+      assertThatThrownBy(() -> 
TableMetadata.buildFromEmpty(formatVersion).enableRowLineage())
+          .isInstanceOf(IllegalArgumentException.class)
+          .hasMessageContaining("Cannot use row lineage");
+    }
+  }
+
+  @TestTemplate
+  public void testSnapshotAddition() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    long newRows = 30L;
+
+    TableMetadata base = baseMetadata();
+
+    Snapshot addRows =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId(), newRows);
+
+    TableMetadata firstAddition = 
TableMetadata.buildFrom(base).addSnapshot(addRows).build();
+
+    assertThat(firstAddition.nextRowId()).isEqualTo(newRows);
+
+    Snapshot addMoreRows =
+        new BaseSnapshot(
+            1, 2, 1L, 0, DataOperations.APPEND, null, 1, "foo", 
firstAddition.nextRowId(), newRows);
+
+    TableMetadata secondAddition =
+        
TableMetadata.buildFrom(firstAddition).addSnapshot(addMoreRows).build();
+
+    assertThat(secondAddition.nextRowId()).isEqualTo(newRows * 2);
+  }
+
+  @TestTemplate
+  public void testInvalidSnapshotAddition() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    Long newRows = 30L;
+
+    TableMetadata base = baseMetadata();
+
+    Snapshot invalidLastRow =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId() - 3, newRows);
+
+    assertThatThrownBy(() -> 
TableMetadata.buildFrom(base).addSnapshot(invalidLastRow))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Cannot add a snapshot whose 'first-row-id'");
+
+    Snapshot invalidNewRows =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId(), null);
+
+    assertThatThrownBy(() -> 
TableMetadata.buildFrom(base).addSnapshot(invalidNewRows))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining(
+            "Cannot add a snapshot with a null 'added-rows' field when row 
lineage is enabled");
+  }
+
+  @TestTemplate
+  public void testFastAppend() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+    TableMetadata base = table.ops().current();
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    table.newFastAppend().appendFile(fileWithRows(30)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30);
+
+    
table.newFastAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11);
+  }
+
+  @TestTemplate
+  public void testAppend() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+    TableMetadata base = table.ops().current();
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    table.newAppend().appendFile(fileWithRows(30)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30);
+
+    
table.newAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11);
+  }
+
+  @TestTemplate
+  public void testAppendBranch() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+    // Appends to a branch should still change last-row-id even if not on 
main, these changes
+    // should also affect commits to main
+
+    String branch = "some_branch";
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+
+    TableMetadata base = table.ops().current();
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    // Write to Branch
+    table.newAppend().appendFile(fileWithRows(30)).toBranch(branch).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();

Review Comment:
   Nit: I'm not sure this assert is really necessary 



##########
core/src/main/java/org/apache/iceberg/BaseSnapshot.java:
##########
@@ -61,7 +63,9 @@ class BaseSnapshot implements Snapshot {
       String operation,
       Map<String, String> summary,
       Integer schemaId,
-      String manifestList) {
+      String manifestList,
+      Long firstRowId,
+      Long addedRows) {

Review Comment:
   If we decide to add a Builder or some other kinda refactoring, I'd also 
prefer to punt. The PR is fairly sizeable and I'd prefer to keep it focused on 
the row lineage core metadata changes (and all of this rather internal at the 
moment)



##########
core/src/test/java/org/apache/iceberg/TestSnapshotJson.java:
##########
@@ -105,6 +129,40 @@ public void testJsonConversionWithOperation() throws 
IOException {
     assertThat(snapshot.operation()).isEqualTo(expected.operation());
     assertThat(snapshot.summary()).isEqualTo(expected.summary());
     assertThat(snapshot.schemaId()).isEqualTo(expected.schemaId());
+    assertThat(snapshot.firstRowId()).isNull();
+    assertThat(snapshot.addedRows()).isNull();
+  }
+
+  @Test
+  public void testJsonConversionWithRowLineage() throws IOException {
+    int snapshotId = 23;
+    Long parentId = null;
+    Long firstRowId = 20L;
+    Long addedRows = 30L;
+    String manifestList = createManifestListWithManifestFiles(snapshotId, 
parentId);
+
+    Snapshot expected =
+        new BaseSnapshot(
+            0,
+            snapshotId,
+            parentId,
+            System.currentTimeMillis(),
+            null,
+            null,
+            null,
+            manifestList,
+            firstRowId,
+            addedRows);
+    String json = SnapshotParser.toJson(expected);
+    Snapshot snapshot = SnapshotParser.fromJson(json);
+
+    assertThat(snapshot.snapshotId()).isEqualTo(expected.snapshotId());
+    
assertThat(snapshot.allManifests(ops.io())).isEqualTo(expected.allManifests(ops.io()));
+    assertThat(snapshot.operation()).isNull();
+    assertThat(snapshot.summary()).isNull();
+    assertThat(snapshot.schemaId()).isNull();
+    assertThat(snapshot.firstRowId()).isEqualTo(firstRowId);
+    assertThat(snapshot.addedRows()).isEqualTo(addedRows);
   }

Review Comment:
   Do we have tests for snapshots produced where tables where there is no row 
lineage enabled?



##########
core/src/test/java/org/apache/iceberg/TestSnapshotJson.java:
##########
@@ -105,6 +129,40 @@ public void testJsonConversionWithOperation() throws 
IOException {
     assertThat(snapshot.operation()).isEqualTo(expected.operation());
     assertThat(snapshot.summary()).isEqualTo(expected.summary());
     assertThat(snapshot.schemaId()).isEqualTo(expected.schemaId());
+    assertThat(snapshot.firstRowId()).isNull();
+    assertThat(snapshot.addedRows()).isNull();
+  }
+
+  @Test
+  public void testJsonConversionWithRowLineage() throws IOException {
+    int snapshotId = 23;
+    Long parentId = null;
+    Long firstRowId = 20L;
+    Long addedRows = 30L;
+    String manifestList = createManifestListWithManifestFiles(snapshotId, 
parentId);
+
+    Snapshot expected =
+        new BaseSnapshot(
+            0,
+            snapshotId,
+            parentId,
+            System.currentTimeMillis(),
+            null,
+            null,
+            null,
+            manifestList,
+            firstRowId,
+            addedRows);
+    String json = SnapshotParser.toJson(expected);
+    Snapshot snapshot = SnapshotParser.fromJson(json);
+
+    assertThat(snapshot.snapshotId()).isEqualTo(expected.snapshotId());
+    
assertThat(snapshot.allManifests(ops.io())).isEqualTo(expected.allManifests(ops.io()));
+    assertThat(snapshot.operation()).isNull();
+    assertThat(snapshot.summary()).isNull();
+    assertThat(snapshot.schemaId()).isNull();
+    assertThat(snapshot.firstRowId()).isEqualTo(firstRowId);
+    assertThat(snapshot.addedRows()).isEqualTo(addedRows);
   }

Review Comment:
   Ah missed the assertions above this test for the previous test, great! 



##########
core/src/main/java/org/apache/iceberg/SnapshotProducer.java:
##########
@@ -290,7 +298,27 @@ public Snapshot apply() {
         operation(),
         summary(base),
         base.currentSchemaId(),
-        manifestList.location());
+        manifestList.location(),
+        lastRowId,
+        addedRows);
+  }
+
+  private Long calculateAddedRows(List<ManifestFile> manifests) {
+    return manifests.stream()
+        .filter(
+            manifest ->
+                manifest.snapshotId() == null
+                    || Objects.equals(manifest.snapshotId(), this.snapshotId))
+        .mapToLong(
+            manifest -> {
+              Preconditions.checkArgument(
+                  manifest.addedRowsCount() != null,
+                  "Cannot determine number of added rows in snapshot because"
+                      + " the entry for manifest %s is missing the field 
`added-rows-count`",
+                  manifest.path());

Review Comment:
   Actually sorry if this is a naive q and missing this earlier, how does this 
work for manifest rewrites? In that case there would be a new manifest(s) for 
this snapshot and no new added rows, but this would fail the check currently if 
I follow right.



##########
core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java:
##########
@@ -195,6 +195,26 @@ public void getBool() throws JsonProcessingException {
     assertThat(JsonUtil.getBool("x", JsonUtil.mapper().readTree("{\"x\": 
false}"))).isFalse();
   }
 
+  @Test
+  public void getBoolOrNull() throws JsonProcessingException {
+    assertThat(JsonUtil.getBoolOrNull("x", 
JsonUtil.mapper().readTree("{}"))).isNull();
+
+    assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": 
null}"))).isNull();
+
+    assertThatThrownBy(
+            () -> JsonUtil.getBoolOrNull("x", 
JsonUtil.mapper().readTree("{\"x\": \"23\"}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a boolean value: x: \"23\"");
+
+    assertThatThrownBy(
+            () -> JsonUtil.getBoolOrNull("x", 
JsonUtil.mapper().readTree("{\"x\": \"true\"}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a boolean value: x: \"true\"");
+
+    assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": 
true}"))).isTrue();
+    assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": 
false}"))).isFalse();

Review Comment:
   Super nit, feel free to ignore but I'd find this test a bit easier to read 
if we just move all the happy cases together and then the failure cases after



##########
core/src/main/java/org/apache/iceberg/SnapshotProducer.java:
##########
@@ -282,6 +283,13 @@ public Snapshot apply() {
       throw new RuntimeIOException(e, "Failed to write manifest list file");
     }
 
+    Long addedRows = null;
+    Long lastRowId = null;
+    if (base.rowLineageEnabled()) {
+      addedRows = calculateAddedRows(manifests);

Review Comment:
   It's an optimization so we can always just do this later (and arguably makes 
it a bit harder to read the code) but instead of waiting until all the 
manifests are written what if we set the addedRows as we added to the writer. 
   
   Something like
   
   
   ```
   // remove the writer.addAll
   for (ManifestFile manifest: manifestFIles) {
        if (manifest.snapshotId() == null || (manifest.snapshotId() == 
this.snapshotId) {
                        Preconditions.checkArgument(
                     manifest.addedRowsCount() != null,
                     "Cannot determine number of added rows in snapshot because"
                         + " the entry for manifest %s is missing the field 
`added-rows-count`",
             addedRowsCount += manifest.addedRowsCount();
        }
         writer.add(manifest);
   }
   ```



##########
core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRowLineageMetadata {
+
+  @Parameters(name = "formatVersion = {0}")
+  private static List<Integer> formatVersion() {
+    return Ints.asList(TestHelpers.ALL_VERSIONS);
+  }
+
+  @Parameter private int formatVersion;
+
+  private static final String TEST_LOCATION = "s3://bucket/test/location";
+
+  private static final Schema TEST_SCHEMA =
+      new Schema(
+          7,
+          Types.NestedField.required(1, "x", Types.LongType.get()),
+          Types.NestedField.required(2, "y", Types.LongType.get(), "comment"),
+          Types.NestedField.required(3, "z", Types.LongType.get()));
+
+  private TableMetadata baseMetadata() {
+    return TableMetadata.buildFromEmpty(formatVersion)
+        .enableRowLineage()
+        .addSchema(TEST_SCHEMA)
+        .setLocation(TEST_LOCATION)
+        .addPartitionSpec(PartitionSpec.unpartitioned())
+        .addSortOrder(SortOrder.unsorted())
+        .build();
+  }
+
+  @TempDir private File tableDir = null;
+
+  @AfterEach
+  public void cleanup() {
+    TestTables.clearTables();
+  }
+
+  @TestTemplate
+  public void testRowLineageSupported() {
+    if (formatVersion >= TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE) {
+      
assertThat(TableMetadata.buildFromEmpty(formatVersion).enableRowLineage()).isNotNull();
+    } else {
+      assertThatThrownBy(() -> 
TableMetadata.buildFromEmpty(formatVersion).enableRowLineage())
+          .isInstanceOf(IllegalArgumentException.class)
+          .hasMessageContaining("Cannot use row lineage");
+    }
+  }
+
+  @TestTemplate
+  public void testSnapshotAddition() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    long newRows = 30L;
+
+    TableMetadata base = baseMetadata();
+
+    Snapshot addRows =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId(), newRows);
+
+    TableMetadata firstAddition = 
TableMetadata.buildFrom(base).addSnapshot(addRows).build();
+
+    assertThat(firstAddition.nextRowId()).isEqualTo(newRows);
+
+    Snapshot addMoreRows =
+        new BaseSnapshot(
+            1, 2, 1L, 0, DataOperations.APPEND, null, 1, "foo", 
firstAddition.nextRowId(), newRows);
+
+    TableMetadata secondAddition =
+        
TableMetadata.buildFrom(firstAddition).addSnapshot(addMoreRows).build();
+
+    assertThat(secondAddition.nextRowId()).isEqualTo(newRows * 2);
+  }
+
+  @TestTemplate
+  public void testInvalidSnapshotAddition() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    Long newRows = 30L;
+
+    TableMetadata base = baseMetadata();
+
+    Snapshot invalidLastRow =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId() - 3, newRows);
+
+    assertThatThrownBy(() -> 
TableMetadata.buildFrom(base).addSnapshot(invalidLastRow))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Cannot add a snapshot whose 'first-row-id'");
+
+    Snapshot invalidNewRows =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId(), null);
+
+    assertThatThrownBy(() -> 
TableMetadata.buildFrom(base).addSnapshot(invalidNewRows))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining(
+            "Cannot add a snapshot with a null 'added-rows' field when row 
lineage is enabled");
+  }
+
+  @TestTemplate
+  public void testFastAppend() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+    TableMetadata base = table.ops().current();
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    table.newFastAppend().appendFile(fileWithRows(30)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();

Review Comment:
   My general 2c, not sure I understand the intent behind the assertions in 
this test class that row lineage is still enabled after the commit. I feel like 
for the purpose of this test class for the happy cases is just to make sure the 
first row/next row IDs are set properly according to the operation (that 
implicitly verifies that row lineage is enabled).



##########
core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRowLineageMetadata {
+
+  @Parameters(name = "formatVersion = {0}")
+  private static List<Integer> formatVersion() {
+    return Ints.asList(TestHelpers.ALL_VERSIONS);
+  }
+
+  @Parameter private int formatVersion;
+
+  private static final String TEST_LOCATION = "s3://bucket/test/location";
+
+  private static final Schema TEST_SCHEMA =
+      new Schema(
+          7,
+          Types.NestedField.required(1, "x", Types.LongType.get()),
+          Types.NestedField.required(2, "y", Types.LongType.get(), "comment"),
+          Types.NestedField.required(3, "z", Types.LongType.get()));
+
+  private TableMetadata baseMetadata() {
+    return TableMetadata.buildFromEmpty(formatVersion)
+        .enableRowLineage()
+        .addSchema(TEST_SCHEMA)
+        .setLocation(TEST_LOCATION)
+        .addPartitionSpec(PartitionSpec.unpartitioned())
+        .addSortOrder(SortOrder.unsorted())
+        .build();
+  }
+
+  @TempDir private File tableDir = null;
+
+  @AfterEach
+  public void cleanup() {
+    TestTables.clearTables();
+  }
+
+  @TestTemplate
+  public void testRowLineageSupported() {
+    if (formatVersion >= TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE) {
+      
assertThat(TableMetadata.buildFromEmpty(formatVersion).enableRowLineage()).isNotNull();
+    } else {
+      assertThatThrownBy(() -> 
TableMetadata.buildFromEmpty(formatVersion).enableRowLineage())
+          .isInstanceOf(IllegalArgumentException.class)
+          .hasMessageContaining("Cannot use row lineage");
+    }
+  }
+
+  @TestTemplate
+  public void testSnapshotAddition() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    long newRows = 30L;
+
+    TableMetadata base = baseMetadata();
+
+    Snapshot addRows =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId(), newRows);
+
+    TableMetadata firstAddition = 
TableMetadata.buildFrom(base).addSnapshot(addRows).build();
+
+    assertThat(firstAddition.nextRowId()).isEqualTo(newRows);
+
+    Snapshot addMoreRows =
+        new BaseSnapshot(
+            1, 2, 1L, 0, DataOperations.APPEND, null, 1, "foo", 
firstAddition.nextRowId(), newRows);
+
+    TableMetadata secondAddition =
+        
TableMetadata.buildFrom(firstAddition).addSnapshot(addMoreRows).build();
+
+    assertThat(secondAddition.nextRowId()).isEqualTo(newRows * 2);
+  }
+
+  @TestTemplate
+  public void testInvalidSnapshotAddition() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    Long newRows = 30L;
+
+    TableMetadata base = baseMetadata();
+
+    Snapshot invalidLastRow =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId() - 3, newRows);
+
+    assertThatThrownBy(() -> 
TableMetadata.buildFrom(base).addSnapshot(invalidLastRow))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Cannot add a snapshot whose 'first-row-id'");
+
+    Snapshot invalidNewRows =
+        new BaseSnapshot(
+            0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", 
base.nextRowId(), null);
+
+    assertThatThrownBy(() -> 
TableMetadata.buildFrom(base).addSnapshot(invalidNewRows))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining(
+            "Cannot add a snapshot with a null 'added-rows' field when row 
lineage is enabled");
+  }
+
+  @TestTemplate
+  public void testFastAppend() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+    TableMetadata base = table.ops().current();
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    table.newFastAppend().appendFile(fileWithRows(30)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30);
+
+    
table.newFastAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11);
+  }
+
+  @TestTemplate
+  public void testAppend() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+    TableMetadata base = table.ops().current();
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    table.newAppend().appendFile(fileWithRows(30)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30);
+
+    
table.newAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11);
+  }
+
+  @TestTemplate
+  public void testAppendBranch() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+    // Appends to a branch should still change last-row-id even if not on 
main, these changes
+    // should also affect commits to main
+
+    String branch = "some_branch";
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+
+    TableMetadata base = table.ops().current();
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    // Write to Branch
+    table.newAppend().appendFile(fileWithRows(30)).toBranch(branch).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot()).isNull();
+    assertThat(table.snapshot(branch).firstRowId()).isEqualTo(0L);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30);
+
+    // Write to Main
+    
table.newAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11);
+
+    // Write again to branch
+    table.newAppend().appendFile(fileWithRows(21)).toBranch(branch).commit();
+    assertThat(table.snapshot(branch).firstRowId()).isEqualTo(30 + 17 + 11);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11 + 21);
+  }
+
+  @TestTemplate
+  public void testDeletes() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+    TableMetadata base = table.ops().current();
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    DataFile file = fileWithRows(30);
+
+    table.newAppend().appendFile(file).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30);
+
+    table.newDelete().deleteFile(file).commit();
+
+    // Deleting a file should create a new snapshot which should inherit 
last-row-id from the
+    // previous metadata and not
+    // change last-row-id for this metadata.
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30);
+    assertThat(table.currentSnapshot().addedRows()).isEqualTo(0);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(30);
+  }
+
+  @TestTemplate
+  public void testReplace() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+    TableMetadata base = table.ops().current();
+
+    table.ops().commit(base, 
TableMetadata.buildFrom(base).enableRowLineage().build());
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.ops().current().nextRowId()).isEqualTo(0L);
+
+    DataFile filePart1 = fileWithRows(30);
+    DataFile filePart2 = fileWithRows(30);
+    DataFile fileCompacted = fileWithRows(60);
+
+    table.newAppend().appendFile(filePart1).appendFile(filePart2).commit();
+
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(60);
+
+    
table.newRewrite().deleteFile(filePart1).deleteFile(filePart2).addFile(fileCompacted).commit();
+
+    // Rewrites are currently just treated as appends. In the future we could 
treat these as no-ops
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+    assertThat(table.currentSnapshot().firstRowId()).isEqualTo(60);
+    assertThat(table.ops().current().nextRowId()).isEqualTo(120);
+  }
+
+  @TestTemplate
+  public void testEnableRowLineageViaProperty() {
+    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
+
+    TestTables.TestTable table =
+        TestTables.create(
+            tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
+
+    assertThat(table.ops().current().rowLineageEnabled()).isFalse();
+
+    // No-op
+    table.updateProperties().set(TableProperties.ROW_LINEAGE, 
"false").commit();
+    assertThat(table.ops().current().rowLineageEnabled()).isFalse();
+
+    // Enable row lineage
+    table.updateProperties().set(TableProperties.ROW_LINEAGE, "true").commit();
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();
+
+    // Disabling row lineage is not allowed
+    assertThatThrownBy(
+            () -> table.updateProperties().set(TableProperties.ROW_LINEAGE, 
"false").commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot disable row lineage once it has been 
enabled");
+
+    // No-op
+    table.updateProperties().set(TableProperties.ROW_LINEAGE, "true").commit();
+    assertThat(table.ops().current().rowLineageEnabled()).isTrue();

Review Comment:
   Nit: I don't feel like this is strictly neccessary? 



##########
core/src/main/java/org/apache/iceberg/TableMetadata.java:
##########
@@ -262,6 +265,8 @@ public String toString() {
   private volatile Map<Long, Snapshot> snapshotsById;
   private volatile Map<String, SnapshotRef> refs;
   private volatile boolean snapshotsLoaded;
+  private final Boolean rowLineageEnabled;

Review Comment:
   Understood the rationale, we really want to prevent writing the field 
accidentally for versions before V3 and having the explicit null makes the 
later check a bit easier. SGTM



##########
core/src/main/java/org/apache/iceberg/TableMetadata.java:
##########
@@ -615,10 +638,15 @@ public TableMetadata replaceProperties(Map<String, 
String> rawProperties) {
     int newFormatVersion =
         PropertyUtil.propertyAsInt(rawProperties, 
TableProperties.FORMAT_VERSION, formatVersion);
 
+    Boolean newRowLineage =

Review Comment:
   Thanks for the explanation, I think that's reasonable. I think this is a 
case where inlining makes it a bit harder to read so I think it's better as it 
is now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to