dramaticlly commented on code in PR #12172: URL: https://github.com/apache/iceberg/pull/12172#discussion_r1945867503
########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java: ########## @@ -235,6 +235,51 @@ public void testStartVersion() throws Exception { .isEqualTo(0); } + @Test + public void testIncrementalRewrite() throws Exception { + String location = newTableLocation(); + Table sourceTable = + TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), location); + List<ThreeColumnRecord> recordsA = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset<Row> dfA = spark.createDataFrame(recordsA, ThreeColumnRecord.class).coalesce(1); + + // Write first increment to source table + dfA.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + spark.read().format("iceberg").load(location).show(); + assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(1); + + // Replicate first increment to target table + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTable.location(), targetTableLocation()) + .execute(); + copyTableFiles(result); + assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1); + + // Write second increment to source table + List<ThreeColumnRecord> recordsB = + Lists.newArrayList(new ThreeColumnRecord(1, "BBBBBBBBB", "BBB")); + Dataset<Row> dfB = spark.createDataFrame(recordsB, ThreeColumnRecord.class).coalesce(1); + dfB.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(2); + + // Replicate second increment to target table + Table sourceTableIncrement = TABLES.load(location); + Table targetTable = TABLES.load(targetTableLocation()); + String targetTableMetadata = currentMetadata(targetTable).metadataFileLocation(); + String startVersion = fileName(targetTableMetadata); + RewriteTablePath.Result resultB = + actions() + .rewriteTablePath(sourceTableIncrement) + .rewriteLocationPrefix(sourceTable.location(), targetTableLocation()) + .startVersion(startVersion) + .execute(); + copyTableFiles(resultB); + assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(2); Review Comment: instead of verify the count, let's verify the data reads from both source and target table ```java // might need orderBy to ensure deterministic ordering List<Object[]> actual = rows(targetTableLocation()); List<Object[]> expected = rows(location); assertEquals("Rows should match after copy", expected, actual); ``` ########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java: ########## @@ -235,6 +235,51 @@ public void testStartVersion() throws Exception { .isEqualTo(0); } + @Test + public void testIncrementalRewrite() throws Exception { + String location = newTableLocation(); + Table sourceTable = + TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), location); + List<ThreeColumnRecord> recordsA = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset<Row> dfA = spark.createDataFrame(recordsA, ThreeColumnRecord.class).coalesce(1); + + // Write first increment to source table + dfA.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + spark.read().format("iceberg").load(location).show(); + assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(1); + + // Replicate first increment to target table + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTable.location(), targetTableLocation()) + .execute(); + copyTableFiles(result); + assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1); + + // Write second increment to source table + List<ThreeColumnRecord> recordsB = + Lists.newArrayList(new ThreeColumnRecord(1, "BBBBBBBBB", "BBB")); + Dataset<Row> dfB = spark.createDataFrame(recordsB, ThreeColumnRecord.class).coalesce(1); + dfB.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(2); + + // Replicate second increment to target table + Table sourceTableIncrement = TABLES.load(location); + Table targetTable = TABLES.load(targetTableLocation()); + String targetTableMetadata = currentMetadata(targetTable).metadataFileLocation(); + String startVersion = fileName(targetTableMetadata); + RewriteTablePath.Result resultB = Review Comment: let's call it `incrementalRewriteResult` instead of `resultB` ########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java: ########## @@ -235,6 +235,51 @@ public void testStartVersion() throws Exception { .isEqualTo(0); } + @Test + public void testIncrementalRewrite() throws Exception { + String location = newTableLocation(); + Table sourceTable = + TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), location); + List<ThreeColumnRecord> recordsA = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset<Row> dfA = spark.createDataFrame(recordsA, ThreeColumnRecord.class).coalesce(1); + + // Write first increment to source table + dfA.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + spark.read().format("iceberg").load(location).show(); Review Comment: looks like only needed for debug? ########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java: ########## @@ -235,6 +235,51 @@ public void testStartVersion() throws Exception { .isEqualTo(0); } + @Test + public void testIncrementalRewrite() throws Exception { + String location = newTableLocation(); + Table sourceTable = + TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), location); + List<ThreeColumnRecord> recordsA = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset<Row> dfA = spark.createDataFrame(recordsA, ThreeColumnRecord.class).coalesce(1); + + // Write first increment to source table + dfA.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + spark.read().format("iceberg").load(location).show(); + assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(1); + + // Replicate first increment to target table + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTable.location(), targetTableLocation()) + .execute(); + copyTableFiles(result); + assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1); + + // Write second increment to source table + List<ThreeColumnRecord> recordsB = + Lists.newArrayList(new ThreeColumnRecord(1, "BBBBBBBBB", "BBB")); + Dataset<Row> dfB = spark.createDataFrame(recordsB, ThreeColumnRecord.class).coalesce(1); + dfB.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(2); + + // Replicate second increment to target table + Table sourceTableIncrement = TABLES.load(location); Review Comment: I dont think we need to reload, just use `sourceTable.refresh();` is enough -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org