szehon-ho commented on code in PR #8797: URL: https://github.com/apache/iceberg/pull/8797#discussion_r1610846416
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java: ########## @@ -254,6 +258,15 @@ private static void apply(UpdateSchema pendingUpdate, TableChange.AddColumn add) } } + public static String extractBranch(Identifier ident) { Review Comment: Looks like we can move SparkCatalog to use this one, ok in subsequent pr. ########## api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java: ########## @@ -171,6 +171,17 @@ default RewriteDataFiles zOrder(String... columns) { */ RewriteDataFiles filter(Expression expression); + /** + * A user provided branch which the rewrite happens on the target branch can also be controlled + * via WAP(write-audit-publish) + * + * @param branch the branch where the rewrite happens Review Comment: how about `branch on which the rewrite will be performed` ########## docs/docs/spark-procedures.md: ########## @@ -419,6 +421,18 @@ Rewrite the data files in table `db.sample` and select the files that may contai CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"'); ``` +Rewrite the data files in table `db.sample` specifying target branch rewrite happens on using WAP Review Comment: `Rewrite the data files in table `db.sample` specifying target branch using WAP` ########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java: ########## @@ -85,6 +87,66 @@ public void testRewriteDataFilesInEmptyTable() { assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L, 0)), output); } + @TestTemplate + public void testRewriteOnBranchWAP() throws Exception { + createPartitionTable(); + // create 5 files for each partition (c2 = 'foo' and c2 = 'bar') + insertData(10); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + String branch = "op_audit"; + table.manageSnapshots().createBranch(branch).commit(); + table.refresh(); + long branchSnapshotId = table.currentSnapshot().snapshotId(); + insertData(10); + table.refresh(); + spark.sql( + String.format("ALTER TABLE %s SET TBLPROPERTIES ('write.wap.enabled'='true')", tableName)); + spark.sql(String.format("SET spark.wap.branch = %s", branch)); + long lastSnapshotId = table.currentSnapshot().snapshotId(); + List<Object[]> output = + sql("CALL %s.system.rewrite_data_files(table => '%s')", catalogName, tableIdent); + assertThat(Arrays.copyOf(output.get(0), 2)) + .as("Action should rewrite 10 data files and add 2 data files (one per partition)") + .containsExactly(row(10, 2)); + table.refresh(); + assertThat(table.refs().get(branch).snapshotId()) + .as("branch ref should have changed") + .isNotEqualTo(branchSnapshotId); + assertThat(table.currentSnapshot().snapshotId()) + .as("rewrite should happen on branch") Review Comment: Why not talk about the actual assert like `main branch ref should not have changed`? ########## api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java: ########## @@ -171,6 +171,17 @@ default RewriteDataFiles zOrder(String... columns) { */ RewriteDataFiles filter(Expression expression); + /** + * A user provided branch which the rewrite happens on the target branch can also be controlled Review Comment: Seems like there is a runon sentence? How about: ```Specify the branch on which the rewrite will be performed.``` It more matches the Iceberg docs on Branches. https://iceberg.apache.org/docs/1.5.1/spark-writes/#update Is there anything specific need to call out wrt WAP? (Looks like wap branch is a Spark concept?) ########## core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java: ########## @@ -51,18 +53,25 @@ public RewriteDataFilesCommitManager(Table table, long startingSnapshotId) { public RewriteDataFilesCommitManager( Table table, long startingSnapshotId, boolean useStartingSequenceNumber) { - this(table, startingSnapshotId, useStartingSequenceNumber, ImmutableMap.of()); + this( + table, + startingSnapshotId, + useStartingSequenceNumber, + ImmutableMap.of(), + SnapshotRef.MAIN_BRANCH); } public RewriteDataFilesCommitManager( Table table, long startingSnapshotId, boolean useStartingSequenceNumber, - Map<String, String> snapshotProperties) { + Map<String, String> snapshotProperties, + String branch) { Review Comment: did revapi not complain about breaking this interface? Should we keep the other for backward compat? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java: ########## @@ -146,13 +148,33 @@ public RewriteDataFilesSparkAction filter(Expression expression) { return this; } + @Override + public RewriteDataFiles targetBranch(String branch) { + this.targetBranch = branch; + SnapshotRef ref = table.refs().get(this.targetBranch); + Preconditions.checkArgument( + ref != null, String.format("Branch does not exist: %s", targetBranch)); + Preconditions.checkArgument( + ref.isBranch(), String.format("Ref %s is not a branch", targetBranch)); + return this; + } + + protected long startingSnapshotId() { + if (SnapshotRef.MAIN_BRANCH.equals(this.targetBranch)) { Review Comment: I think @aokolnychyi once told me that `this` is reserved for target of assigment. ########## api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java: ########## @@ -171,6 +171,17 @@ default RewriteDataFiles zOrder(String... columns) { */ RewriteDataFiles filter(Expression expression); + /** + * A user provided branch which the rewrite happens on the target branch can also be controlled + * via WAP(write-audit-publish) + * + * @param branch the branch where the rewrite happens + * @return this for chaining Review Comment: to match the others: `this for method chaining`? ########## docs/docs/spark-procedures.md: ########## @@ -337,6 +337,8 @@ Iceberg tracks each data file in a table. More data files leads to more metadata Iceberg can compact data files in parallel using Spark with the `rewriteDataFiles` action. This will combine small files into larger files to reduce metadata overhead and runtime file open cost. +It is also possible to use write audit publish (WAP) or branch identifier to rewrite on target branch. Review Comment: Looks like we should support wap.id as well? ########## docs/docs/spark-procedures.md: ########## @@ -419,6 +421,18 @@ Rewrite the data files in table `db.sample` and select the files that may contai CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"'); ``` +Rewrite the data files in table `db.sample` specifying target branch rewrite happens on using WAP +```sql +ALTER TABLE db.table SET TBLPROPERTIES ('write.wap.enabled'='true') +SET spark.wap.branch = audit +CALL catalog_name.system.rewrite_data_files('db.sample'); +``` + +Rewrite the data files in table `db.sample` specifying target branch rewrite happens on using branch identifier Review Comment: `Rewrite the data files in table `db.sample` specifying target branch using branch identifier` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java: ########## @@ -134,6 +138,21 @@ private RewriteDataFiles checkAndApplyFilter( return action; } + private RewriteDataFiles checkAndApplyBranch( + Table table, Identifier ident, RewriteDataFiles action) { + String branchIdent = Spark3Util.extractBranch(ident); + if (branchIdent != null) { + return action.targetBranch(branchIdent); + } + SparkWriteConf writeConf = new SparkWriteConf(spark(), table, Maps.newHashMap()); Review Comment: Nit: can we simplify like: ``` String branchIdent = Spark3Util.extractBranch(ident); SparkWriteConf writeConf = new SparkWriteConf(spark(), table, branch, ...); String targetBranch = writeConf.branch(); if (targetBranch != null) { action.targetBranch(targetBranch); } return action; } ########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java: ########## @@ -85,6 +87,66 @@ public void testRewriteDataFilesInEmptyTable() { assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L, 0)), output); } + @TestTemplate + public void testRewriteOnBranchWAP() throws Exception { Review Comment: Nit: would use cherry case (see all other tests are called testSomethingWap) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org