pvary commented on code in PR #12692:
URL: https://github.com/apache/iceberg/pull/12692#discussion_r2067879419


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java:
##########
@@ -56,16 +58,18 @@ protected void doRewrite(String groupId, List<FileScanTask> 
group) {
         .write()
         .format("iceberg")
         .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
-        .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
-        .option(SparkWriteOptions.DISTRIBUTION_MODE, 
distributionMode(group).modeName())
-        .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
+        .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, 
group.maxOutputFileSize())
+        .option(
+            SparkWriteOptions.DISTRIBUTION_MODE,
+            distributionMode(group.fileScanTasks(), 
spec(group.outputSpecId())).modeName())
+        .option(SparkWriteOptions.OUTPUT_SPEC_ID, group.outputSpecId())
         .mode("append")
         .save(groupId);
   }
 
   // invoke a shuffle if the original spec does not match the output spec
-  private DistributionMode distributionMode(List<FileScanTask> group) {
-    boolean requiresRepartition = !group.get(0).spec().equals(outputSpec());
+  private DistributionMode distributionMode(List<FileScanTask> group, 
PartitionSpec outputSpec) {

Review Comment:
   Good idea.
   Done



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java:
##########
@@ -120,36 +111,39 @@ protected RewriteDataFilesSparkAction self() {
 
   @Override
   public RewriteDataFilesSparkAction binPack() {
-    Preconditions.checkArgument(
-        rewriter == null, "Must use only one rewriter type (bin-pack, sort, 
zorder)");
-    this.rewriter = new SparkBinPackDataRewriter(spark(), table);
+    checkRunnerIsUnset();
+    this.runner = new SparkBinPackFileRewriteRunner(spark(), table);
     return this;
   }
 
   @Override
   public RewriteDataFilesSparkAction sort(SortOrder sortOrder) {
-    Preconditions.checkArgument(
-        rewriter == null, "Must use only one rewriter type (bin-pack, sort, 
zorder)");
-    this.rewriter = new SparkSortDataRewriter(spark(), table, sortOrder);
+    checkRunnerIsUnset();
+    this.runner = new SparkSortFileRewriteRunner(spark(), table, sortOrder);
     return this;
   }
 
   @Override
   public RewriteDataFilesSparkAction sort() {
-    Preconditions.checkArgument(
-        rewriter == null, "Must use only one rewriter type (bin-pack, sort, 
zorder)");
-    this.rewriter = new SparkSortDataRewriter(spark(), table);
+    checkRunnerIsUnset();
+    this.runner = new SparkSortFileRewriteRunner(spark(), table);
     return this;
   }
 
   @Override
   public RewriteDataFilesSparkAction zOrder(String... columnNames) {
-    Preconditions.checkArgument(
-        rewriter == null, "Must use only one rewriter type (bin-pack, sort, 
zorder)");
-    this.rewriter = new SparkZOrderDataRewriter(spark(), table, 
Arrays.asList(columnNames));
+    checkRunnerIsUnset();
+    this.runner = new SparkZOrderFileRewriteRunner(spark(), table, 
Arrays.asList(columnNames));
     return this;
   }
 
+  private void checkRunnerIsUnset() {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to