pvary commented on code in PR #12692: URL: https://github.com/apache/iceberg/pull/12692#discussion_r2067879419
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java: ########## @@ -56,16 +58,18 @@ protected void doRewrite(String groupId, List<FileScanTask> group) { .write() .format("iceberg") .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) - .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) - .option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode(group).modeName()) - .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId()) + .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, group.maxOutputFileSize()) + .option( + SparkWriteOptions.DISTRIBUTION_MODE, + distributionMode(group.fileScanTasks(), spec(group.outputSpecId())).modeName()) + .option(SparkWriteOptions.OUTPUT_SPEC_ID, group.outputSpecId()) .mode("append") .save(groupId); } // invoke a shuffle if the original spec does not match the output spec - private DistributionMode distributionMode(List<FileScanTask> group) { - boolean requiresRepartition = !group.get(0).spec().equals(outputSpec()); + private DistributionMode distributionMode(List<FileScanTask> group, PartitionSpec outputSpec) { Review Comment: Good idea. Done ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java: ########## @@ -120,36 +111,39 @@ protected RewriteDataFilesSparkAction self() { @Override public RewriteDataFilesSparkAction binPack() { - Preconditions.checkArgument( - rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)"); - this.rewriter = new SparkBinPackDataRewriter(spark(), table); + checkRunnerIsUnset(); + this.runner = new SparkBinPackFileRewriteRunner(spark(), table); return this; } @Override public RewriteDataFilesSparkAction sort(SortOrder sortOrder) { - Preconditions.checkArgument( - rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)"); - this.rewriter = new SparkSortDataRewriter(spark(), table, sortOrder); + checkRunnerIsUnset(); + this.runner = new SparkSortFileRewriteRunner(spark(), table, sortOrder); return this; } @Override public RewriteDataFilesSparkAction sort() { - Preconditions.checkArgument( - rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)"); - this.rewriter = new SparkSortDataRewriter(spark(), table); + checkRunnerIsUnset(); + this.runner = new SparkSortFileRewriteRunner(spark(), table); return this; } @Override public RewriteDataFilesSparkAction zOrder(String... columnNames) { - Preconditions.checkArgument( - rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)"); - this.rewriter = new SparkZOrderDataRewriter(spark(), table, Arrays.asList(columnNames)); + checkRunnerIsUnset(); + this.runner = new SparkZOrderFileRewriteRunner(spark(), table, Arrays.asList(columnNames)); return this; } + private void checkRunnerIsUnset() { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org