dramaticlly commented on code in PR #12172:
URL: https://github.com/apache/iceberg/pull/12172#discussion_r1945867503


##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -235,6 +235,51 @@ public void testStartVersion() throws Exception {
         .isEqualTo(0);
   }
 
+  @Test
+  public void testIncrementalRewrite() throws Exception {
+    String location = newTableLocation();
+    Table sourceTable =
+        TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
Maps.newHashMap(), location);
+    List<ThreeColumnRecord> recordsA =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> dfA = spark.createDataFrame(recordsA, 
ThreeColumnRecord.class).coalesce(1);
+
+    // Write first increment to source table
+    dfA.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    spark.read().format("iceberg").load(location).show();
+    
assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(1);
+
+    // Replicate first increment to target table
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(sourceTable.location(), 
targetTableLocation())
+            .execute();
+    copyTableFiles(result);
+    
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1);
+
+    // Write second increment to source table
+    List<ThreeColumnRecord> recordsB =
+        Lists.newArrayList(new ThreeColumnRecord(1, "BBBBBBBBB", "BBB"));
+    Dataset<Row> dfB = spark.createDataFrame(recordsB, 
ThreeColumnRecord.class).coalesce(1);
+    dfB.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    
assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(2);
+
+    // Replicate second increment to target table
+    Table sourceTableIncrement = TABLES.load(location);
+    Table targetTable = TABLES.load(targetTableLocation());
+    String targetTableMetadata = 
currentMetadata(targetTable).metadataFileLocation();
+    String startVersion = fileName(targetTableMetadata);
+    RewriteTablePath.Result resultB =
+        actions()
+            .rewriteTablePath(sourceTableIncrement)
+            .rewriteLocationPrefix(sourceTable.location(), 
targetTableLocation())
+            .startVersion(startVersion)
+            .execute();
+    copyTableFiles(resultB);
+    
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(2);

Review Comment:
   instead of verify the count, let's verify the data reads from both source 
and target table
   ```java
       //  might need orderBy to ensure deterministic ordering
       List<Object[]> actual = rows(targetTableLocation());
       List<Object[]> expected = rows(location);
       assertEquals("Rows should match after copy", expected, actual);
   ```



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -235,6 +235,51 @@ public void testStartVersion() throws Exception {
         .isEqualTo(0);
   }
 
+  @Test
+  public void testIncrementalRewrite() throws Exception {
+    String location = newTableLocation();
+    Table sourceTable =
+        TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
Maps.newHashMap(), location);
+    List<ThreeColumnRecord> recordsA =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> dfA = spark.createDataFrame(recordsA, 
ThreeColumnRecord.class).coalesce(1);
+
+    // Write first increment to source table
+    dfA.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    spark.read().format("iceberg").load(location).show();
+    
assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(1);
+
+    // Replicate first increment to target table
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(sourceTable.location(), 
targetTableLocation())
+            .execute();
+    copyTableFiles(result);
+    
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1);
+
+    // Write second increment to source table
+    List<ThreeColumnRecord> recordsB =
+        Lists.newArrayList(new ThreeColumnRecord(1, "BBBBBBBBB", "BBB"));
+    Dataset<Row> dfB = spark.createDataFrame(recordsB, 
ThreeColumnRecord.class).coalesce(1);
+    dfB.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    
assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(2);
+
+    // Replicate second increment to target table
+    Table sourceTableIncrement = TABLES.load(location);
+    Table targetTable = TABLES.load(targetTableLocation());
+    String targetTableMetadata = 
currentMetadata(targetTable).metadataFileLocation();
+    String startVersion = fileName(targetTableMetadata);
+    RewriteTablePath.Result resultB =

Review Comment:
   let's call it `incrementalRewriteResult` instead of `resultB`



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -235,6 +235,51 @@ public void testStartVersion() throws Exception {
         .isEqualTo(0);
   }
 
+  @Test
+  public void testIncrementalRewrite() throws Exception {
+    String location = newTableLocation();
+    Table sourceTable =
+        TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
Maps.newHashMap(), location);
+    List<ThreeColumnRecord> recordsA =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> dfA = spark.createDataFrame(recordsA, 
ThreeColumnRecord.class).coalesce(1);
+
+    // Write first increment to source table
+    dfA.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    spark.read().format("iceberg").load(location).show();

Review Comment:
   looks like only needed for debug?



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -235,6 +235,51 @@ public void testStartVersion() throws Exception {
         .isEqualTo(0);
   }
 
+  @Test
+  public void testIncrementalRewrite() throws Exception {
+    String location = newTableLocation();
+    Table sourceTable =
+        TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
Maps.newHashMap(), location);
+    List<ThreeColumnRecord> recordsA =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> dfA = spark.createDataFrame(recordsA, 
ThreeColumnRecord.class).coalesce(1);
+
+    // Write first increment to source table
+    dfA.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    spark.read().format("iceberg").load(location).show();
+    
assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(1);
+
+    // Replicate first increment to target table
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(sourceTable.location(), 
targetTableLocation())
+            .execute();
+    copyTableFiles(result);
+    
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1);
+
+    // Write second increment to source table
+    List<ThreeColumnRecord> recordsB =
+        Lists.newArrayList(new ThreeColumnRecord(1, "BBBBBBBBB", "BBB"));
+    Dataset<Row> dfB = spark.createDataFrame(recordsB, 
ThreeColumnRecord.class).coalesce(1);
+    dfB.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    
assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(2);
+
+    // Replicate second increment to target table
+    Table sourceTableIncrement = TABLES.load(location);

Review Comment:
   I dont think we need to reload, just use `sourceTable.refresh();` is enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to