jackieo168 commented on code in PR #6717: URL: https://github.com/apache/iceberg/pull/6717#discussion_r1096420864
########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java: ########## @@ -105,6 +108,7 @@ public class SparkCatalog extends BaseCatalog { private static final Splitter COMMA = Splitter.on(","); private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); + private static final Pattern BRANCH = Pattern.compile("branch_(.*)"); Review Comment: Do we also want to make similar changes to `SparkCachedTableCatalog`? ########## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java: ########## @@ -370,4 +370,42 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Cannot override ref, already set snapshot id="); } + + @Test + public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List<SimpleRecord> firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + + Dataset<Row> currentSnapshotResult = + spark.read().format("iceberg").option("branch", "branch").load(tableLocation); + List<SimpleRecord> currentSnapshotRecords = + currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + List<SimpleRecord> expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + + table.updateSchema().deleteColumn("data").commit(); + + Dataset<Row> deleteSnapshotResult = + spark.read().format("iceberg").option("branch", "branch").load(tableLocation); + List<SimpleRecord> deletedSnapshotRecords = + deleteSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + List<SimpleRecord> expectedRecordsAfterDeletion = Lists.newArrayList(); + expectedRecordsAfterDeletion.addAll(firstBatchRecords); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, deletedSnapshotRecords); Review Comment: @namrathamyske please see if you can leverage the unit tests added in our fork version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org