jackieo168 commented on code in PR #6717:
URL: https://github.com/apache/iceberg/pull/6717#discussion_r1096420864


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -105,6 +108,7 @@ public class SparkCatalog extends BaseCatalog {
   private static final Splitter COMMA = Splitter.on(",");
   private static final Pattern AT_TIMESTAMP = 
Pattern.compile("at_timestamp_(\\d+)");
   private static final Pattern SNAPSHOT_ID = 
Pattern.compile("snapshot_id_(\\d+)");
+  private static final Pattern BRANCH = Pattern.compile("branch_(.*)");

Review Comment:
   Do we also want to make similar changes to `SparkCachedTableCatalog`?



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java:
##########
@@ -370,4 +370,42 @@ public void 
testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessageStartingWith("Cannot override ref, already set snapshot 
id=");
   }
+
+  @Test
+  public void testSnapshotSelectionByBranchWithSchemaChange() throws 
IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, tableLocation);
+
+    // produce the first snapshot
+    List<SimpleRecord> firstBatchRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new 
SimpleRecord(3, "c"));
+    Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, 
SimpleRecord.class);
+    firstDf.select("id", 
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+    table.manageSnapshots().createBranch("branch", 
table.currentSnapshot().snapshotId()).commit();
+
+    Dataset<Row> currentSnapshotResult =
+        spark.read().format("iceberg").option("branch", 
"branch").load(tableLocation);
+    List<SimpleRecord> currentSnapshotRecords =
+        
currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    List<SimpleRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(firstBatchRecords);
+    Assert.assertEquals(
+        "Current snapshot rows should match", expectedRecords, 
currentSnapshotRecords);
+
+    table.updateSchema().deleteColumn("data").commit();
+
+    Dataset<Row> deleteSnapshotResult =
+        spark.read().format("iceberg").option("branch", 
"branch").load(tableLocation);
+    List<SimpleRecord> deletedSnapshotRecords =
+        
deleteSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    List<SimpleRecord> expectedRecordsAfterDeletion = Lists.newArrayList();
+    expectedRecordsAfterDeletion.addAll(firstBatchRecords);
+    Assert.assertEquals(
+        "Current snapshot rows should match", expectedRecords, 
deletedSnapshotRecords);

Review Comment:
   @namrathamyske please see if you can leverage the unit tests added in our 
fork version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to