szehon-ho commented on code in PR #8797:
URL: https://github.com/apache/iceberg/pull/8797#discussion_r1610846416


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -254,6 +258,15 @@ private static void apply(UpdateSchema pendingUpdate, 
TableChange.AddColumn add)
     }
   }
 
+  public static String extractBranch(Identifier ident) {

Review Comment:
   Looks like we can move SparkCatalog to use this one, ok in subsequent pr.



##########
api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java:
##########
@@ -171,6 +171,17 @@ default RewriteDataFiles zOrder(String... columns) {
    */
   RewriteDataFiles filter(Expression expression);
 
+  /**
+   * A user provided branch which the rewrite happens on the target branch can 
also be controlled
+   * via WAP(write-audit-publish)
+   *
+   * @param branch the branch where the rewrite happens

Review Comment:
   how about `branch on which the rewrite will be performed`



##########
docs/docs/spark-procedures.md:
##########
@@ -419,6 +421,18 @@ Rewrite the data files in table `db.sample` and select the 
files that may contai
 CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id 
= 3 and name = "foo"');
 ```
 
+Rewrite the data files in table `db.sample` specifying target branch rewrite 
happens on using WAP

Review Comment:
   `Rewrite the data files in table `db.sample` specifying target branch using 
WAP`



##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -85,6 +87,66 @@ public void testRewriteDataFilesInEmptyTable() {
     assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L, 
0)), output);
   }
 
+  @TestTemplate
+  public void testRewriteOnBranchWAP() throws Exception {
+    createPartitionTable();
+    // create 5 files for each partition (c2 = 'foo' and c2 = 'bar')
+    insertData(10);
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    String branch = "op_audit";
+    table.manageSnapshots().createBranch(branch).commit();
+    table.refresh();
+    long branchSnapshotId = table.currentSnapshot().snapshotId();
+    insertData(10);
+    table.refresh();
+    spark.sql(
+        String.format("ALTER TABLE %s SET TBLPROPERTIES 
('write.wap.enabled'='true')", tableName));
+    spark.sql(String.format("SET spark.wap.branch = %s", branch));
+    long lastSnapshotId = table.currentSnapshot().snapshotId();
+    List<Object[]> output =
+        sql("CALL %s.system.rewrite_data_files(table => '%s')", catalogName, 
tableIdent);
+    assertThat(Arrays.copyOf(output.get(0), 2))
+        .as("Action should rewrite 10 data files and add 2 data files (one per 
partition)")
+        .containsExactly(row(10, 2));
+    table.refresh();
+    assertThat(table.refs().get(branch).snapshotId())
+        .as("branch ref should have changed")
+        .isNotEqualTo(branchSnapshotId);
+    assertThat(table.currentSnapshot().snapshotId())
+        .as("rewrite should happen on branch")

Review Comment:
   Why not talk about the actual assert like `main branch ref should not have 
changed`?



##########
api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java:
##########
@@ -171,6 +171,17 @@ default RewriteDataFiles zOrder(String... columns) {
    */
   RewriteDataFiles filter(Expression expression);
 
+  /**
+   * A user provided branch which the rewrite happens on the target branch can 
also be controlled

Review Comment:
   Seems like there is a runon sentence?  How about:
   
   ```Specify the branch on which the rewrite will be performed.```
   
   It more matches the Iceberg docs on Branches.  
https://iceberg.apache.org/docs/1.5.1/spark-writes/#update
   
   Is there anything specific need to call out wrt WAP?  (Looks like wap branch 
is a Spark concept?)



##########
core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java:
##########
@@ -51,18 +53,25 @@ public RewriteDataFilesCommitManager(Table table, long 
startingSnapshotId) {
 
   public RewriteDataFilesCommitManager(
       Table table, long startingSnapshotId, boolean useStartingSequenceNumber) 
{
-    this(table, startingSnapshotId, useStartingSequenceNumber, 
ImmutableMap.of());
+    this(
+        table,
+        startingSnapshotId,
+        useStartingSequenceNumber,
+        ImmutableMap.of(),
+        SnapshotRef.MAIN_BRANCH);
   }
 
   public RewriteDataFilesCommitManager(
       Table table,
       long startingSnapshotId,
       boolean useStartingSequenceNumber,
-      Map<String, String> snapshotProperties) {
+      Map<String, String> snapshotProperties,
+      String branch) {

Review Comment:
   did revapi not complain about breaking this interface?  Should we keep the 
other for backward compat?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java:
##########
@@ -146,13 +148,33 @@ public RewriteDataFilesSparkAction filter(Expression 
expression) {
     return this;
   }
 
+  @Override
+  public RewriteDataFiles targetBranch(String branch) {
+    this.targetBranch = branch;
+    SnapshotRef ref = table.refs().get(this.targetBranch);
+    Preconditions.checkArgument(
+        ref != null, String.format("Branch does not exist: %s", targetBranch));
+    Preconditions.checkArgument(
+        ref.isBranch(), String.format("Ref %s is not a branch", targetBranch));
+    return this;
+  }
+
+  protected long startingSnapshotId() {
+    if (SnapshotRef.MAIN_BRANCH.equals(this.targetBranch)) {

Review Comment:
   I think @aokolnychyi once told me that `this` is reserved for target of 
assigment.



##########
api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java:
##########
@@ -171,6 +171,17 @@ default RewriteDataFiles zOrder(String... columns) {
    */
   RewriteDataFiles filter(Expression expression);
 
+  /**
+   * A user provided branch which the rewrite happens on the target branch can 
also be controlled
+   * via WAP(write-audit-publish)
+   *
+   * @param branch the branch where the rewrite happens
+   * @return this for chaining

Review Comment:
   to match the others: `this for method chaining`?



##########
docs/docs/spark-procedures.md:
##########
@@ -337,6 +337,8 @@ Iceberg tracks each data file in a table. More data files 
leads to more metadata
 
 Iceberg can compact data files in parallel using Spark with the 
`rewriteDataFiles` action. This will combine small files into larger files to 
reduce metadata overhead and runtime file open cost.
 
+It is also possible to use write audit publish (WAP) or branch identifier to 
rewrite on target branch.

Review Comment:
   Looks like we should support wap.id as well?



##########
docs/docs/spark-procedures.md:
##########
@@ -419,6 +421,18 @@ Rewrite the data files in table `db.sample` and select the 
files that may contai
 CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id 
= 3 and name = "foo"');
 ```
 
+Rewrite the data files in table `db.sample` specifying target branch rewrite 
happens on using WAP
+```sql
+ALTER TABLE db.table SET TBLPROPERTIES ('write.wap.enabled'='true')
+SET spark.wap.branch = audit
+CALL catalog_name.system.rewrite_data_files('db.sample');
+```
+
+Rewrite the data files in table `db.sample` specifying target branch rewrite 
happens on using branch identifier

Review Comment:
   `Rewrite the data files in table `db.sample` specifying target branch using 
branch identifier`



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java:
##########
@@ -134,6 +138,21 @@ private RewriteDataFiles checkAndApplyFilter(
     return action;
   }
 
+  private RewriteDataFiles checkAndApplyBranch(
+      Table table, Identifier ident, RewriteDataFiles action) {
+    String branchIdent = Spark3Util.extractBranch(ident);
+    if (branchIdent != null) {
+      return action.targetBranch(branchIdent);
+    }
+    SparkWriteConf writeConf = new SparkWriteConf(spark(), table, 
Maps.newHashMap());

Review Comment:
   Nit: can we simplify like:
   
   ```
   String branchIdent = Spark3Util.extractBranch(ident);
   SparkWriteConf writeConf = new SparkWriteConf(spark(), table, branch, ...);
   String targetBranch = writeConf.branch();
   if (targetBranch != null) {
      action.targetBranch(targetBranch);
   } 
   return action;
   }



##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -85,6 +87,66 @@ public void testRewriteDataFilesInEmptyTable() {
     assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L, 
0)), output);
   }
 
+  @TestTemplate
+  public void testRewriteOnBranchWAP() throws Exception {

Review Comment:
   Nit: would use cherry case (see all other tests are called testSomethingWap)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to