advancedxy commented on code in PR #7731:
URL: https://github.com/apache/iceberg/pull/7731#discussion_r1210362126


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -79,6 +88,44 @@ public static CloseableIterable<FileScanTask> splitFiles(
     return CloseableIterable.combine(splitTasks, tasks);
   }
 
+  /**
+   * Produces {@link CombinedScanTask combined tasks} from an iterable of 
{@link FileScanTask file
+   * tasks}, using an adaptive target split size that targets a minimum number 
of tasks
+   * (parallelism).
+   *
+   * @param files incoming iterable of file tasks
+   * @param parallelism target minimum number of tasks
+   * @param splitSize target split size
+   * @param lookback bin packing lookback
+   * @param openFileCost minimum file cost
+   * @return an iterable of combined tasks
+   */
+  public static CloseableIterable<CombinedScanTask> planTasksAdaptive(
+      CloseableIterable<FileScanTask> files,
+      int parallelism,
+      long splitSize,
+      int lookback,
+      long openFileCost) {
+
+    validatePlanningArguments(splitSize, lookback, openFileCost);
+
+    Function<FileScanTask, Long> weightFunc =
+        file ->
+            Math.max(
+                file.length()
+                    + 
file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(),

Review Comment:
   I think these two lines should be replaced as `file.sizeBytes()` like 
`weightFunc` in 
   
   
https://github.com/apache/iceberg/pull/7731/files#diff-0b0affef002de35215ed5459cf6963fd0d5bf6d5403e67440b7ef97a35189454R170-R172
   
   The original logic in L91-L92 in `planTasks` might not be optimal: 
   ```
     public static CloseableIterable<CombinedScanTask> planTasks(
         CloseableIterable<FileScanTask> splitFiles, long splitSize, int 
lookback, long openFileCost) {
   
       validatePlanningArguments(splitSize, lookback, openFileCost);
       // Check the size of delete file as well to avoid unbalanced bin-packing
       Function<FileScanTask, Long> weightFunc =
           file ->
               Math.max(
                   file.length()
                       + 
file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(),
                   (1 + file.deletes().size()) * openFileCost);
   ```
   
   the `file.length` returns the the length of split/task, which would includes 
all the columns in one row group(for parquet). However if the table is 
selectively scanned, only a small portion of columns would be scanned(column 
pruning), the actual scanned size would be much smaller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to